You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/05 02:00:46 UTC
git commit: TEZ-400. Change RuntimeTask to handle Input/Output
lifecycle (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 73e1b6c4c -> 52ff7aca3
TEZ-400. Change RuntimeTask to handle Input/Output lifecycle (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/52ff7aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/52ff7aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/52ff7aca
Branch: refs/heads/TEZ-398
Commit: 52ff7aca3b5c2a389ab7b6b661b2b531ac7dbe33
Parents: 73e1b6c
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 4 17:00:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 4 17:00:00 2013 -0700
----------------------------------------------------------------------
.../tez/engine/newapi/TezTaskContext.java | 6 -
.../tez/engine/newapi/rpc/impl/TaskSpec.java | 18 ++
.../engine/newapi/impl/TezInputContextImpl.java | 5 +-
.../newapi/impl/TezOutputContextImpl.java | 31 +-
.../newapi/impl/TezProcessorContextImpl.java | 4 +-
.../engine/newapi/impl/TezTaskContextImpl.java | 14 +-
.../LogicalIOProcessorRuntimeTask.java | 283 +++++++++++++++++++
.../tez/engine/newruntime/RuntimeUtils.java | 62 ++++
8 files changed, 401 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index aff7cde..0724885 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -21,7 +21,6 @@ package org.apache.tez.engine.newapi;
import java.util.List;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
/**
* Base interface for Context classes used to initialize the Input, Output
@@ -29,11 +28,6 @@ import org.apache.tez.dag.api.TezConfiguration;
*/
public interface TezTaskContext {
- /**
- * Get the Tez Configuration
- * @return {@link TezConfiguration}
- */
- public TezConfiguration getConfiguration();
/**
* Get the index of this Task
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
index 8349d5b..2ba8cf7 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
@@ -21,6 +21,7 @@ package org.apache.tez.engine.newapi.rpc.impl;
import java.util.List;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
/**
* Serializable Task information that is sent across the Umbilical from the
@@ -29,6 +30,23 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
public interface TaskSpec {
/**
+ * Get the vertex name for the current task.
+ * @return the vertex name set by the user.
+ */
+ public String getVertexName();
+
+ /**
+ * Get the task attempt id for the current task.
+ * @return the {@link TaskAttemptID}
+ */
+ public TezTaskAttemptID getTaskAttemptID();
+
+ /**
+ * Get the owner of the job.
+ * @return the owner.
+ */
+ public String getUser();
+ /**
* The Processor definition for the given Task
* @return {@link ProcessorDescriptor}
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 2047e59..4b04474 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -20,6 +20,8 @@ package org.apache.tez.engine.newapi.impl;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -31,7 +33,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
private final byte[] userPayload;
- public TezInputContextImpl(TezConfiguration tezConf, String vertexName,
+ @Private
+ public TezInputContextImpl(Configuration tezConf, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload) {
super(tezConf, vertexName, taskAttemptID, counters);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index 5bb7917..9e488e2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -1,19 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.engine.newapi.impl;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezOutputContext;
-public class TezOutputContextImpl extends TezTaskContextImpl {
+public class TezOutputContextImpl extends TezTaskContextImpl implements
+ TezOutputContext {
private final byte[] userPayload;
- public TezOutputContextImpl(TezConfiguration tezConf, String vertexName,
- TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload) {
+ @Private
+ public TezOutputContextImpl(Configuration tezConf, String vertexName,
+ TezTaskAttemptID taskAttemptID, TezCounters counters, byte[] userPayload) {
super(tezConf, vertexName, taskAttemptID, counters);
this.userPayload = userPayload;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 26e6978..b987bfe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.tez.engine.newapi.impl;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezProcessorContext;
@@ -31,7 +31,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
private final byte[] userPayload;
- public TezProcessorContextImpl(TezConfiguration tezConf, String vertexName,
+ public TezProcessorContextImpl(Configuration tezConf, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload) {
super(tezConf, vertexName, taskAttemptID, counters);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 2feeb85..862b6d2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -18,19 +18,21 @@
package org.apache.tez.engine.newapi.impl;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.TezTaskContext;
public abstract class TezTaskContextImpl implements TezTaskContext {
- private final TezConfiguration tezConf;
+ private final Configuration tezConf;
private final String vertexName;
private final TezTaskAttemptID taskAttemptID;
private final TezCounters counters;
- public TezTaskContextImpl(TezConfiguration tezConf,
+ @Private
+ public TezTaskContextImpl(Configuration tezConf,
String vertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters) {
this.tezConf = tezConf;
@@ -40,11 +42,6 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
}
@Override
- public TezConfiguration getConfiguration() {
- return tezConf;
- }
-
- @Override
public int getTaskIndex() {
return taskAttemptID.getTaskID().getId();
}
@@ -64,4 +61,5 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
return counters;
}
+ // TODO Add a method to get working dir
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..a31346c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.Input;
+import org.apache.tez.engine.newapi.LogicalIOProcessor;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.Output;
+import org.apache.tez.engine.newapi.Processor;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
+import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
+import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.newapi.rpc.impl.InputSpec;
+import org.apache.tez.engine.newapi.rpc.impl.OutputSpec;
+import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
+
+import com.google.common.base.Preconditions;
+
+@Private
+public class LogicalIOProcessorRuntimeTask {
+
+ private enum State {
+ NEW, INITED, RUNNING, CLOSED
+ }
+
+ private static final Log LOG = LogFactory
+ .getLog(LogicalIOProcessorRuntimeTask.class);
+
+ private final TaskSpec taskSpec;
+ private final Configuration tezConf;
+
+ private final List<InputSpec> inputSpecs;
+ private final List<LogicalInput> inputs;
+
+ private final List<OutputSpec> outputSpecs;
+ private final List<LogicalOutput> outputs;
+
+ private final ProcessorDescriptor processorDescriptor;
+ private final LogicalIOProcessor processor;
+
+ private final TezCounters tezCounters;
+
+ private State state;
+
+ private Map<String, LogicalInput> inputMap;
+ private Map<String, LogicalOutput> outputMap;
+
+ private Map<String, List<Event>> initInputEventMap;
+ private Map<String, List<Event>> initOutputEventMap;
+
+ private Map<String, List<Event>> closeInputEventMap;
+ private Map<String, List<Event>> closeOutputEventMap;
+
+ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf) {
+ LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ + taskSpec);
+ this.taskSpec = taskSpec;
+ this.tezConf = tezConf;
+ this.inputSpecs = taskSpec.getInputs();
+ this.inputs = createInputs(inputSpecs);
+ this.outputSpecs = taskSpec.getOutputs();
+ this.outputs = createOutputs(outputSpecs);
+ this.processorDescriptor = taskSpec.getProcessorDescriptor();
+ this.processor = createProcessor(processorDescriptor);
+ this.tezCounters = new TezCounters();
+ this.state = State.NEW;
+ }
+
+ public void initialize() throws IOException {
+ Preconditions.checkState(this.state == State.NEW, "Already initialized");
+ this.state = State.INITED;
+ inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
+ outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
+
+ initInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
+ initOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+
+ // TODO Maybe close initialized inputs / outputs in case of failure to
+ // initialize.
+ // Initialize all inputs. TODO: Multi-threaded at some point.
+ for (int i = 0; i < inputs.size(); i++) {
+ String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+ List<Event> initInputEvents = initializeInput(inputs.get(i),
+ inputSpecs.get(i));
+ // TODO Add null/event list checking here or in the actual executor.
+ initInputEventMap.put(srcVertexName, initInputEvents);
+ inputMap.put(srcVertexName, inputs.get(i));
+ }
+
+ // Initialize all outputs. TODO: Multi-threaded at some point.
+ for (int i = 0; i < outputs.size(); i++) {
+ String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+ List<Event> initOutputEvents = initializeOutput(outputs.get(i),
+ outputSpecs.get(i));
+ // TODO Add null/event list checking here or in the actual executor.
+ initOutputEventMap.put(destVertexName, initOutputEvents);
+ outputMap.put(destVertexName, outputs.get(i));
+ }
+
+ // Initialize processor.
+ initializeLogicalIOProcessor();
+ }
+
+ public Map<String, List<Event>> getInputInitEvents() {
+ Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
+ return initInputEventMap;
+ }
+
+ public Map<String, List<Event>> getOutputInitEvents() {
+ Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
+ return initOutputEventMap;
+ }
+
+ public void run() throws IOException {
+ Preconditions.checkState(this.state == State.INITED,
+ "Can only run while in INITED state. Current: " + this.state);
+ this.state = State.RUNNING;
+ LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
+ lioProcessor.run(inputMap, outputMap);
+ }
+
+ public void close() throws IOException {
+ Preconditions.checkState(this.state == State.RUNNING,
+ "Can only run while in RUNNING state. Current: " + this.state);
+ this.state=State.CLOSED;
+ closeInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
+ closeOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+
+ // Close the Inputs.
+ for (int i = 0; i < inputs.size(); i++) {
+ String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+ List<Event> closeInputEvents = inputs.get(i).close();
+ closeInputEventMap.put(srcVertexName, closeInputEvents);
+ }
+
+ // Close the Processor.
+ processor.close();
+
+ // Close the Outputs.
+ for (int i = 0; i < outputs.size(); i++) {
+ String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+ List<Event> closeOutputEvents = outputs.get(i).close();
+ closeOutputEventMap.put(destVertexName, closeOutputEvents);
+ }
+ }
+
+ public Map<String, List<Event>> getInputCloseEvents() {
+ Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
+ return closeInputEventMap;
+ }
+
+ public Map<String, List<Event>> getOutputCloseEvents() {
+ Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
+ return closeOutputEventMap;
+ }
+
+ private List<Event> initializeInput(Input input, InputSpec inputSpec)
+ throws IOException {
+ TezInputContext tezInputContext = createInputContext(inputSpec);
+ if (input instanceof LogicalInput) {
+ ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+ .getPhysicalEdgeCount());
+ }
+ return input.initialize(tezInputContext);
+ }
+
+ private List<Event> initializeOutput(Output output, OutputSpec outputSpec)
+ throws IOException {
+ TezOutputContext tezOutputContext = createOutputContext(outputSpec);
+ if (output instanceof LogicalOutput) {
+ ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+ .getPhysicalEdgeCount());
+ }
+ return output.initialize(tezOutputContext);
+ }
+
+ private void initializeLogicalIOProcessor() throws IOException {
+ TezProcessorContext processorContext = createProcessorContext();
+ processor.initialize(processorContext);
+ }
+
+ private TezInputContext createInputContext(InputSpec inputSpec) {
+ TezInputContext inputContext = new TezInputContextImpl(tezConf,
+ taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+ inputSpec.getInputDescriptor().getUserPayload());
+ return inputContext;
+ }
+
+ private TezOutputContext createOutputContext(OutputSpec outputSpec) {
+ TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
+ taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+ outputSpec.getOutputDescriptor().getUserPayload());
+ return outputContext;
+ }
+
+ private TezProcessorContext createProcessorContext() {
+ TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
+ taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+ processorDescriptor.getUserPayload());
+ return processorContext;
+ }
+
+ private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
+ List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
+ for (InputSpec inputSpec : inputSpecs) {
+ Input input = RuntimeUtils.createClazzInstance(inputSpec
+ .getInputDescriptor().getClassName());
+
+ if (input instanceof LogicalInput) {
+ inputs.add((LogicalInput) input);
+ } else {
+ throw new TezUncheckedException(
+ input.getClass().getName()
+ + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by a LogicalIOProcessor.");
+ }
+
+ }
+ return inputs;
+ }
+
+ private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
+ List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
+ outputSpecs.size());
+ for (OutputSpec outputSpec : outputSpecs) {
+ Output output = RuntimeUtils.createClazzInstance(outputSpec
+ .getOutputDescriptor().getClassName());
+ if (output instanceof LogicalOutput) {
+ outputs.add((LogicalOutput) output);
+ } else {
+ throw new TezUncheckedException(
+ output.getClass().getName()
+ + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by a LogicalIOProcessor.");
+ }
+ }
+ return outputs;
+ }
+
+ private LogicalIOProcessor createProcessor(
+ ProcessorDescriptor processorDescriptor) {
+ Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+ .getClassName());
+ if (!(processor instanceof LogicalIOProcessor)) {
+ throw new TezUncheckedException(
+ processor.getClass().getName()
+ + " is not a sub-type of LogicalIOProcessor. Only LogicalIOProcessor sub-types supported at the moment");
+ }
+ return (LogicalIOProcessor) processor;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
new file mode 100644
index 0000000..20a029e
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class RuntimeUtils {
+
+ private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+ private static Class<?> getClazz(String className) {
+ Class<?> clazz = CLAZZ_CACHE.get(className);
+ if (clazz == null) {
+ try {
+ clazz = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Unable to load class: " + className, e);
+ }
+ }
+ return clazz;
+ }
+
+ private static <T> T getNewInstance(Class<T> clazz) {
+ T instance;
+ try {
+ instance = clazz.newInstance();
+ } catch (InstantiationException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ } catch (IllegalAccessException e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+ }
+ return instance;
+ }
+
+ public static <T> T createClazzInstance(String className) {
+ Class<?> clazz = getClazz(className);
+ @SuppressWarnings("unchecked")
+ T instance = (T) getNewInstance(clazz);
+ return instance;
+ }
+}