You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/28 01:55:29 UTC

git commit: TEZ-1062. Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 2efe086d8 -> b084c7f8d


TEZ-1062. Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b084c7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b084c7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b084c7f8

Branch: refs/heads/master
Commit: b084c7f8d12ff38ff3d824734e009a1fc71ee20b
Parents: 2efe086
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Apr 27 16:55:16 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Apr 27 16:55:16 2014 -0700

----------------------------------------------------------------------
 .../examples/BroadcastAndOneToOneExample.java   |  67 +++---------
 .../tez/mapreduce/examples/UnionExample.java    | 102 +++++--------------
 .../tez/mapreduce/examples/WordCount.java       |  87 +++-------------
 .../mapreduce/processor/SimpleMRProcessor.java  |  73 +++++++++++++
 .../library/processor/SimpleProcessor.java      |  83 +++++++++++++++
 5 files changed, 213 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index db3e1cf..22e11ba 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -56,72 +56,37 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 import com.google.common.base.Preconditions;
 
 public class BroadcastAndOneToOneExample {
-  public static class InputProcessor extends AbstractLogicalIOProcessor {
+  public static class InputProcessor extends SimpleProcessor {
     Text word = new Text();
 
     @Override
-    public void initialize() throws Exception {
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-    }
-
-    @Override
-    public void close() throws Exception {
-    }
-
-    @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
-      for (LogicalOutput output : outputs.values()) {
-        output.start();
-      }
-      Preconditions.checkArgument(outputs.size() == 1);
-      OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) outputs.values().iterator().next();
+    public void run() throws Exception {
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) getOutputs().values().iterator()
+          .next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
     }
   }
-  
-  public static class OneToOneProcessor extends AbstractLogicalIOProcessor {
-    Text word = new Text();
-    
-    @Override
-    public void initialize() throws Exception {
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-    }
 
-    @Override
-    public void close() throws Exception {
-    }
+  public static class OneToOneProcessor extends SimpleProcessor {
+    Text word = new Text();
 
     @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
+    public void run() throws Exception {
       Preconditions.checkArgument(inputs.size() == 2);
 
-      for (LogicalInput input : inputs.values()) {
-        input.start();
-      }
-
-      KeyValueReader inputKvReader = (KeyValueReader) inputs.get("Input").getReader();
-      KeyValueReader broadcastKvReader = (KeyValueReader) inputs.get("Broadcast").getReader();
+      KeyValueReader inputKvReader = (KeyValueReader) getInputs().get("Input").getReader();
+      KeyValueReader broadcastKvReader = (KeyValueReader) getInputs().get("Broadcast").getReader();
       int sum = 0;
       while (broadcastKvReader.next()) {
         sum += ((IntWritable) broadcastKvReader.getCurrentValue()).get();
@@ -133,21 +98,21 @@ public class BroadcastAndOneToOneExample {
       int taskIndex = getContext().getTaskIndex();
       switch (taskIndex) {
       case 0:
-        Preconditions.checkState((sum==1), "Sum = " + sum);
+        Preconditions.checkState((sum == 1), "Sum = " + sum);
         break;
       case 1:
-        Preconditions.checkState((sum==2), "Sum = " + sum);
+        Preconditions.checkState((sum == 2), "Sum = " + sum);
         break;
       case 2:
-        Preconditions.checkState((sum==3), "Sum = " + sum);
+        Preconditions.checkState((sum == 3), "Sum = " + sum);
         break;
       default:
         throw new TezUncheckedException("Unexpected taskIndex: " + taskIndex);
       }
     }
-    
+
   }
-  
+
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Path stagingDir) throws IOException {
     Configuration kvInputConf = new JobConf((Configuration)tezConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 91eb297..e108e82 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -19,7 +19,6 @@ package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
@@ -67,62 +66,42 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class UnionExample {
 
-  public static class TokenProcessor extends AbstractLogicalIOProcessor {
+  public static class TokenProcessor extends SimpleProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
     @Override
-    public void initialize() throws Exception {
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-    }
-
-    @Override
-    public void close() throws Exception {
-    }
-
-    @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
-      Preconditions.checkArgument(inputs.size() == 1);
-      for (LogicalInput input : inputs.values()) {
-        input.start();
-      }
-      for (LogicalOutput output : outputs.values()) {
-        output.start();
-      }
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
       boolean inUnion = true;
       if (getContext().getTaskVertexName().equals("map3")) {
         inUnion = false;
       }
-      Preconditions.checkArgument(outputs.size() == (inUnion ? 2 : 1));
-      Preconditions.checkArgument(outputs.containsKey("checker"));
-      MRInput input = (MRInput) inputs.values().iterator().next();
+      Preconditions.checkArgument(getOutputs().size() == (inUnion ? 2 : 1));
+      Preconditions.checkArgument(getOutputs().containsKey("checker"));
+      MRInput input = (MRInput) getInputs().values().iterator().next();
       KeyValueReader kvReader = input.getReader();
-      OnFileSortedOutput output = (OnFileSortedOutput) outputs.get("checker");
+      OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().get("checker");
       KeyValueWriter kvWriter = output.getWriter();
       MROutput parts = null;
       KeyValueWriter partsWriter = null;
       if (inUnion) {
-        parts = (MROutput) outputs.get("parts");
+        parts = (MROutput) getOutputs().get("parts");
         partsWriter = parts.getWriter();
       }
       while (kvReader.next()) {
@@ -144,46 +123,27 @@ public class UnionExample {
         }
       }
     }
-    
-  }
-  
-  public static class UnionProcessor extends AbstractLogicalIOProcessor {
-    IntWritable one = new IntWritable(1);
-    
-    @Override
-    public void initialize() throws Exception {
-    }
 
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-    }
+  }
 
-    @Override
-    public void close() throws Exception {
-    }
+  public static class UnionProcessor extends SimpleMRProcessor {
+    IntWritable one = new IntWritable(1);
 
     @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
-      Preconditions.checkArgument(inputs.size() == 2);
-      Preconditions.checkArgument(outputs.size() == 2);
-      for (LogicalInput input : inputs.values()) {
-        input.start();
-      }
-      for (LogicalOutput output : outputs.values()) {
-        output.start();
-      }
-      MROutput out = (MROutput) outputs.get("union");
-      MROutput allParts = (MROutput) outputs.get("all-parts");
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 2);
+      Preconditions.checkArgument(getOutputs().size() == 2);
+      MROutput out = (MROutput) getOutputs().get("union");
+      MROutput allParts = (MROutput) getOutputs().get("all-parts");
       KeyValueWriter kvWriter = out.getWriter();
       KeyValueWriter partsWriter = allParts.getWriter();
       Map<String, AtomicInteger> unionKv = Maps.newHashMap();
-      LogicalInput union = inputs.get("union");
+      LogicalInput union = getInputs().get("union");
       KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
       while (kvReader.next()) {
         String word = ((Text) kvReader.getCurrentKey()).toString();
         IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
-        for (int i=0; i<intVal.get(); ++i) {
+        for (int i = 0; i < intVal.get(); ++i) {
           partsWriter.write(word, one);
         }
         AtomicInteger value = unionKv.get(word);
@@ -193,16 +153,16 @@ public class UnionExample {
           value.addAndGet(intVal.get());
         }
       }
-      LogicalInput map3 = inputs.get("map3");
+      LogicalInput map3 = getInputs().get("map3");
       kvReader = (KeyValuesReader) map3.getReader();
       while (kvReader.next()) {
         String word = ((Text) kvReader.getCurrentKey()).toString();
         IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
         AtomicInteger value = unionKv.get(word);
-        if  (value == null) {
+        if (value == null) {
           throw new TezUncheckedException("Expected to exist: " + word);
         } else {
-          value.getAndAdd(intVal.get()*-2);
+          value.getAndAdd(intVal.get() * -2);
         }
       }
       for (AtomicInteger value : unionKv.values()) {
@@ -211,22 +171,10 @@ public class UnionExample {
         }
       }
       kvWriter.write("Union", new IntWritable(unionKv.size()));
-      if (out.isCommitRequired()) {
-        while (!getContext().canCommit()) {
-          Thread.sleep(100);
-        }
-        out.commit();
-      }
-      if (allParts.isCommitRequired()) {
-        while (!getContext().canCommit()) {
-          Thread.sleep(100);
-        }
-        allParts.commit();
-      }
     }
-    
+
   }
-  
+
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Map<String, LocalResource> localResources, Path stagingDir,
       String inputPath, String outputPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 0e8aaba..8a90259 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,7 +18,6 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
@@ -62,10 +61,7 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -74,25 +70,19 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 import com.google.common.base.Preconditions;
 
+
 public class WordCount {
-  public static class TokenProcessor extends AbstractLogicalIOProcessor {
+  public static class TokenProcessor extends SimpleMRProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
     @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
-      for (LogicalInput input : inputs.values()) {
-        input.start();
-      }
-      for (LogicalOutput output : outputs.values()) {
-        output.start();
-      }
-      Preconditions.checkArgument(inputs.size() == 1);
-      Preconditions.checkArgument(outputs.size() == 1);
-      MRInput input = (MRInput) inputs.values().iterator().next();
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      MRInput input = (MRInput) getInputs().values().iterator().next();
       KeyValueReader kvReader = input.getReader();
-      OnFileSortedOutput output = (OnFileSortedOutput) outputs.values().iterator().next();
+      OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().values().iterator().next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       while (kvReader.next()) {
         StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
@@ -103,39 +93,16 @@ public class WordCount {
       }
     }
 
-    @Override
-    public void initialize() throws Exception {
-
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-      
-    }
-
-    @Override
-    public void close() throws Exception {
-      
-    }
-
   }
-  
-  public static class SumProcessor extends AbstractLogicalIOProcessor {
-    @Override
-    public void run(Map<String, LogicalInput> inputs,
-        Map<String, LogicalOutput> outputs) throws Exception {
-      Preconditions.checkArgument(inputs.size() == 1);
-
-      for (LogicalInput input : inputs.values()) {
-        input.start();
-      }
-      for (LogicalOutput output : outputs.values()) {
-        output.start();
-      }
 
-      MROutput out = (MROutput) outputs.values().iterator().next();
+  public static class SumProcessor extends SimpleMRProcessor {
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      MROutput out = (MROutput) getOutputs().values().iterator().next();
       KeyValueWriter kvWriter = out.getWriter();
-      KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();
+      KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
+          .getReader();
       while (kvReader.next()) {
         Text word = (Text) kvReader.getCurrentKey();
         int sum = 0;
@@ -144,31 +111,9 @@ public class WordCount {
         }
         kvWriter.write(word, new IntWritable(sum));
       }
-      if (out.isCommitRequired()) {
-        while (!getContext().canCommit()) {
-          Thread.sleep(100);
-        }
-        out.commit();
-      }
     }
-
-    @Override
-    public void initialize() throws Exception {
-
-    }
-
-    @Override
-    public void handleEvents(List<Event> processorEvents) {
-      
-    }
-
-    @Override
-    public void close() throws Exception {
-      
-    }
-
   }
-  
+
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Map<String, LocalResource> localResources, Path stagingDir,
       String inputPath, String outputPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
new file mode 100644
index 0000000..5fdcec0
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.collect.Lists;
+
+public abstract class SimpleMRProcessor extends SimpleProcessor {
+  private static final Log LOG = LogFactory.getLog(SimpleMRProcessor.class);
+  
+  @Override
+  protected void postOp() throws Exception {
+    if (getOutputs() == null) {
+      return; // No post op
+    }
+    List<MROutput> mrOuts = Lists.newLinkedList();
+    for (LogicalOutput output : getOutputs().values()) {
+      if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) {
+        mrOuts.add((MROutput) output);
+      }
+    }
+    if (mrOuts.size() > 0) {
+      while (!getContext().canCommit()) {
+        Thread.sleep(100);
+      }
+      boolean willAbort = false;
+      Exception savedEx = null;
+      for (MROutput output : mrOuts) {
+        try {
+          output.commit();
+        } catch (IOException ioe) {
+          LOG.warn("Error in committing output", ioe);
+          willAbort = true;
+          savedEx = ioe;
+          break;
+        }
+      }
+      if (willAbort == true) {
+        for (MROutput output : mrOuts) {
+          try {
+            output.abort();
+          } catch (IOException ioe) {
+            LOG.warn("Error in aborting output", ioe);
+          }
+        }
+        throw savedEx;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
new file mode 100644
index 0000000..60eae30
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+
+public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
+  protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
+
+  public void run(Map<String, LogicalInput> _inputs, Map<String, LogicalOutput> _outputs)
+      throws Exception {
+    this.inputs = _inputs;
+    this.outputs = _outputs;
+    preOp();
+    run();
+    postOp();
+  }
+
+  public abstract void run() throws Exception;
+
+  protected void preOp() throws Exception {
+    if (getInputs() != null) {
+      for (LogicalInput input : getInputs().values()) {
+        input.start();
+      }
+    }
+    if (getOutputs() != null) {
+      for (LogicalOutput output : getOutputs().values()) {
+        output.start();
+      }
+    }
+  }
+
+  protected void postOp() throws Exception {
+   //No-op
+  }
+
+  @Override
+  public void initialize() throws Exception {
+
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+
+  }
+
+  @Override
+  public void close() throws Exception {
+
+  }
+
+  public Map<String, LogicalInput> getInputs() {
+    return inputs;
+  }
+
+  public Map<String, LogicalOutput> getOutputs() {
+    return outputs;
+  }
+
+}