You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/28 01:55:29 UTC
git commit: TEZ-1062. Create SimpleProcessor for processors that only
need to implement the run method (Mohammad Kamrul Islam via bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 2efe086d8 -> b084c7f8d
TEZ-1062. Create SimpleProcessor for processors that only need to implement the run method (Mohammad Kamrul Islam via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b084c7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b084c7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b084c7f8
Branch: refs/heads/master
Commit: b084c7f8d12ff38ff3d824734e009a1fc71ee20b
Parents: 2efe086
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Apr 27 16:55:16 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Apr 27 16:55:16 2014 -0700
----------------------------------------------------------------------
.../examples/BroadcastAndOneToOneExample.java | 67 +++---------
.../tez/mapreduce/examples/UnionExample.java | 102 +++++--------------
.../tez/mapreduce/examples/WordCount.java | 87 +++-------------
.../mapreduce/processor/SimpleMRProcessor.java | 73 +++++++++++++
.../library/processor/SimpleProcessor.java | 83 +++++++++++++++
5 files changed, 213 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index db3e1cf..22e11ba 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -56,72 +56,37 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
import com.google.common.base.Preconditions;
public class BroadcastAndOneToOneExample {
- public static class InputProcessor extends AbstractLogicalIOProcessor {
+ public static class InputProcessor extends SimpleProcessor {
Text word = new Text();
@Override
- public void initialize() throws Exception {
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
- }
-
- @Override
- public void close() throws Exception {
- }
-
- @Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
- Preconditions.checkArgument(outputs.size() == 1);
- OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) outputs.values().iterator().next();
+ public void run() throws Exception {
+ Preconditions.checkArgument(getOutputs().size() == 1);
+ OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) getOutputs().values().iterator()
+ .next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
}
}
-
- public static class OneToOneProcessor extends AbstractLogicalIOProcessor {
- Text word = new Text();
-
- @Override
- public void initialize() throws Exception {
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
- }
- @Override
- public void close() throws Exception {
- }
+ public static class OneToOneProcessor extends SimpleProcessor {
+ Text word = new Text();
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
+ public void run() throws Exception {
Preconditions.checkArgument(inputs.size() == 2);
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
-
- KeyValueReader inputKvReader = (KeyValueReader) inputs.get("Input").getReader();
- KeyValueReader broadcastKvReader = (KeyValueReader) inputs.get("Broadcast").getReader();
+ KeyValueReader inputKvReader = (KeyValueReader) getInputs().get("Input").getReader();
+ KeyValueReader broadcastKvReader = (KeyValueReader) getInputs().get("Broadcast").getReader();
int sum = 0;
while (broadcastKvReader.next()) {
sum += ((IntWritable) broadcastKvReader.getCurrentValue()).get();
@@ -133,21 +98,21 @@ public class BroadcastAndOneToOneExample {
int taskIndex = getContext().getTaskIndex();
switch (taskIndex) {
case 0:
- Preconditions.checkState((sum==1), "Sum = " + sum);
+ Preconditions.checkState((sum == 1), "Sum = " + sum);
break;
case 1:
- Preconditions.checkState((sum==2), "Sum = " + sum);
+ Preconditions.checkState((sum == 2), "Sum = " + sum);
break;
case 2:
- Preconditions.checkState((sum==3), "Sum = " + sum);
+ Preconditions.checkState((sum == 3), "Sum = " + sum);
break;
default:
throw new TezUncheckedException("Unexpected taskIndex: " + taskIndex);
}
}
-
+
}
-
+
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Path stagingDir) throws IOException {
Configuration kvInputConf = new JobConf((Configuration)tezConf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 91eb297..e108e82 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -19,7 +19,6 @@ package org.apache.tez.mapreduce.examples;
import java.io.IOException;
import java.util.EnumSet;
-import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
@@ -67,62 +66,42 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class UnionExample {
- public static class TokenProcessor extends AbstractLogicalIOProcessor {
+ public static class TokenProcessor extends SimpleProcessor {
IntWritable one = new IntWritable(1);
Text word = new Text();
@Override
- public void initialize() throws Exception {
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
- }
-
- @Override
- public void close() throws Exception {
- }
-
- @Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- Preconditions.checkArgument(inputs.size() == 1);
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 1);
boolean inUnion = true;
if (getContext().getTaskVertexName().equals("map3")) {
inUnion = false;
}
- Preconditions.checkArgument(outputs.size() == (inUnion ? 2 : 1));
- Preconditions.checkArgument(outputs.containsKey("checker"));
- MRInput input = (MRInput) inputs.values().iterator().next();
+ Preconditions.checkArgument(getOutputs().size() == (inUnion ? 2 : 1));
+ Preconditions.checkArgument(getOutputs().containsKey("checker"));
+ MRInput input = (MRInput) getInputs().values().iterator().next();
KeyValueReader kvReader = input.getReader();
- OnFileSortedOutput output = (OnFileSortedOutput) outputs.get("checker");
+ OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().get("checker");
KeyValueWriter kvWriter = output.getWriter();
MROutput parts = null;
KeyValueWriter partsWriter = null;
if (inUnion) {
- parts = (MROutput) outputs.get("parts");
+ parts = (MROutput) getOutputs().get("parts");
partsWriter = parts.getWriter();
}
while (kvReader.next()) {
@@ -144,46 +123,27 @@ public class UnionExample {
}
}
}
-
- }
-
- public static class UnionProcessor extends AbstractLogicalIOProcessor {
- IntWritable one = new IntWritable(1);
-
- @Override
- public void initialize() throws Exception {
- }
- @Override
- public void handleEvents(List<Event> processorEvents) {
- }
+ }
- @Override
- public void close() throws Exception {
- }
+ public static class UnionProcessor extends SimpleMRProcessor {
+ IntWritable one = new IntWritable(1);
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- Preconditions.checkArgument(inputs.size() == 2);
- Preconditions.checkArgument(outputs.size() == 2);
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
- MROutput out = (MROutput) outputs.get("union");
- MROutput allParts = (MROutput) outputs.get("all-parts");
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 2);
+ Preconditions.checkArgument(getOutputs().size() == 2);
+ MROutput out = (MROutput) getOutputs().get("union");
+ MROutput allParts = (MROutput) getOutputs().get("all-parts");
KeyValueWriter kvWriter = out.getWriter();
KeyValueWriter partsWriter = allParts.getWriter();
Map<String, AtomicInteger> unionKv = Maps.newHashMap();
- LogicalInput union = inputs.get("union");
+ LogicalInput union = getInputs().get("union");
KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
while (kvReader.next()) {
String word = ((Text) kvReader.getCurrentKey()).toString();
IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
- for (int i=0; i<intVal.get(); ++i) {
+ for (int i = 0; i < intVal.get(); ++i) {
partsWriter.write(word, one);
}
AtomicInteger value = unionKv.get(word);
@@ -193,16 +153,16 @@ public class UnionExample {
value.addAndGet(intVal.get());
}
}
- LogicalInput map3 = inputs.get("map3");
+ LogicalInput map3 = getInputs().get("map3");
kvReader = (KeyValuesReader) map3.getReader();
while (kvReader.next()) {
String word = ((Text) kvReader.getCurrentKey()).toString();
IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
AtomicInteger value = unionKv.get(word);
- if (value == null) {
+ if (value == null) {
throw new TezUncheckedException("Expected to exist: " + word);
} else {
- value.getAndAdd(intVal.get()*-2);
+ value.getAndAdd(intVal.get() * -2);
}
}
for (AtomicInteger value : unionKv.values()) {
@@ -211,22 +171,10 @@ public class UnionExample {
}
}
kvWriter.write("Union", new IntWritable(unionKv.size()));
- if (out.isCommitRequired()) {
- while (!getContext().canCommit()) {
- Thread.sleep(100);
- }
- out.commit();
- }
- if (allParts.isCommitRequired()) {
- while (!getContext().canCommit()) {
- Thread.sleep(100);
- }
- allParts.commit();
- }
}
-
+
}
-
+
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 0e8aaba..8a90259 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
@@ -62,10 +61,7 @@ import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -74,25 +70,19 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import com.google.common.base.Preconditions;
+
public class WordCount {
- public static class TokenProcessor extends AbstractLogicalIOProcessor {
+ public static class TokenProcessor extends SimpleMRProcessor {
IntWritable one = new IntWritable(1);
Text word = new Text();
@Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
- Preconditions.checkArgument(inputs.size() == 1);
- Preconditions.checkArgument(outputs.size() == 1);
- MRInput input = (MRInput) inputs.values().iterator().next();
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 1);
+ Preconditions.checkArgument(getOutputs().size() == 1);
+ MRInput input = (MRInput) getInputs().values().iterator().next();
KeyValueReader kvReader = input.getReader();
- OnFileSortedOutput output = (OnFileSortedOutput) outputs.values().iterator().next();
+ OnFileSortedOutput output = (OnFileSortedOutput) getOutputs().values().iterator().next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
@@ -103,39 +93,16 @@ public class WordCount {
}
}
- @Override
- public void initialize() throws Exception {
-
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
-
- }
-
- @Override
- public void close() throws Exception {
-
- }
-
}
-
- public static class SumProcessor extends AbstractLogicalIOProcessor {
- @Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
- Preconditions.checkArgument(inputs.size() == 1);
-
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
- MROutput out = (MROutput) outputs.values().iterator().next();
+ public static class SumProcessor extends SimpleMRProcessor {
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 1);
+ MROutput out = (MROutput) getOutputs().values().iterator().next();
KeyValueWriter kvWriter = out.getWriter();
- KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();
+ KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
+ .getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
@@ -144,31 +111,9 @@ public class WordCount {
}
kvWriter.write(word, new IntWritable(sum));
}
- if (out.isCommitRequired()) {
- while (!getContext().canCommit()) {
- Thread.sleep(100);
- }
- out.commit();
- }
}
-
- @Override
- public void initialize() throws Exception {
-
- }
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
-
- }
-
- @Override
- public void close() throws Exception {
-
- }
-
}
-
+
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
new file mode 100644
index 0000000..5fdcec0
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.collect.Lists;
+
+public abstract class SimpleMRProcessor extends SimpleProcessor {
+ private static final Log LOG = LogFactory.getLog(SimpleMRProcessor.class);
+
+ @Override
+ protected void postOp() throws Exception {
+ if (getOutputs() == null) {
+ return; // No post op
+ }
+ List<MROutput> mrOuts = Lists.newLinkedList();
+ for (LogicalOutput output : getOutputs().values()) {
+ if ((output instanceof MROutput) && (((MROutput) output).isCommitRequired())) {
+ mrOuts.add((MROutput) output);
+ }
+ }
+ if (mrOuts.size() > 0) {
+ while (!getContext().canCommit()) {
+ Thread.sleep(100);
+ }
+ boolean willAbort = false;
+ Exception savedEx = null;
+ for (MROutput output : mrOuts) {
+ try {
+ output.commit();
+ } catch (IOException ioe) {
+ LOG.warn("Error in committing output", ioe);
+ willAbort = true;
+ savedEx = ioe;
+ break;
+ }
+ }
+ if (willAbort == true) {
+ for (MROutput output : mrOuts) {
+ try {
+ output.abort();
+ } catch (IOException ioe) {
+ LOG.warn("Error in aborting output", ioe);
+ }
+ }
+ throw savedEx;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b084c7f8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
new file mode 100644
index 0000000..60eae30
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+
+public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+
+ public void run(Map<String, LogicalInput> _inputs, Map<String, LogicalOutput> _outputs)
+ throws Exception {
+ this.inputs = _inputs;
+ this.outputs = _outputs;
+ preOp();
+ run();
+ postOp();
+ }
+
+ public abstract void run() throws Exception;
+
+ protected void preOp() throws Exception {
+ if (getInputs() != null) {
+ for (LogicalInput input : getInputs().values()) {
+ input.start();
+ }
+ }
+ if (getOutputs() != null) {
+ for (LogicalOutput output : getOutputs().values()) {
+ output.start();
+ }
+ }
+ }
+
+ protected void postOp() throws Exception {
+ //No-op
+ }
+
+ @Override
+ public void initialize() throws Exception {
+
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ public Map<String, LogicalInput> getInputs() {
+ return inputs;
+ }
+
+ public Map<String, LogicalOutput> getOutputs() {
+ return outputs;
+ }
+
+}