You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/05 02:00:46 UTC

git commit: TEZ-400. Change RuntimeTask to handle Input/Output lifecycle (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 73e1b6c4c -> 52ff7aca3


TEZ-400. Change RuntimeTask to handle Input/Output lifecycle (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/52ff7aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/52ff7aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/52ff7aca

Branch: refs/heads/TEZ-398
Commit: 52ff7aca3b5c2a389ab7b6b661b2b531ac7dbe33
Parents: 73e1b6c
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 4 17:00:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 4 17:00:00 2013 -0700

----------------------------------------------------------------------
 .../tez/engine/newapi/TezTaskContext.java       |   6 -
 .../tez/engine/newapi/rpc/impl/TaskSpec.java    |  18 ++
 .../engine/newapi/impl/TezInputContextImpl.java |   5 +-
 .../newapi/impl/TezOutputContextImpl.java       |  31 +-
 .../newapi/impl/TezProcessorContextImpl.java    |   4 +-
 .../engine/newapi/impl/TezTaskContextImpl.java  |  14 +-
 .../LogicalIOProcessorRuntimeTask.java          | 283 +++++++++++++++++++
 .../tez/engine/newruntime/RuntimeUtils.java     |  62 ++++
 8 files changed, 401 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index aff7cde..0724885 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -21,7 +21,6 @@ package org.apache.tez.engine.newapi;
 import java.util.List;
 
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
 
 /**
  * Base interface for Context classes used to initialize the Input, Output
@@ -29,11 +28,6 @@ import org.apache.tez.dag.api.TezConfiguration;
  */
 public interface TezTaskContext {
 
-  /**
-   * Get the Tez Configuration
-   * @return {@link TezConfiguration}
-   */
-  public TezConfiguration getConfiguration();
 
   /**
    * Get the index of this Task

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
index 8349d5b..2ba8cf7 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
@@ -21,6 +21,7 @@ package org.apache.tez.engine.newapi.rpc.impl;
 import java.util.List;
 
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 
 /**
  * Serializable Task information that is sent across the Umbilical from the
@@ -29,6 +30,23 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 public interface TaskSpec {
 
   /**
+   * Get the vertex name for the current task.
+   * @return the vertex name set by the user.
+   */
+  public String getVertexName();
+  
+  /**
+   * Get the task attempt id for the current task.
+   * @return the {@link TaskAttemptID}
+   */
+  public TezTaskAttemptID getTaskAttemptID();
+  
+  /**
+   * Get the owner of the job.
+   * @return the owner.
+   */
+  public String getUser();
+  /**
    * The Processor definition for the given Task
    * @return {@link ProcessorDescriptor}
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 2047e59..4b04474 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -20,6 +20,8 @@ package org.apache.tez.engine.newapi.impl;
 
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -31,7 +33,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
 
   private final byte[] userPayload;
 
-  public TezInputContextImpl(TezConfiguration tezConf, String vertexName,
+  @Private
+  public TezInputContextImpl(Configuration tezConf, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload) {
     super(tezConf, vertexName, taskAttemptID, counters);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index 5bb7917..9e488e2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -1,19 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.engine.newapi.impl;
 
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezOutputContext;
 
-public class TezOutputContextImpl extends TezTaskContextImpl {
+public class TezOutputContextImpl extends TezTaskContextImpl implements
+    TezOutputContext {
 
   private final byte[] userPayload;
 
-  public TezOutputContextImpl(TezConfiguration tezConf, String vertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload) {
+  @Private
+  public TezOutputContextImpl(Configuration tezConf, String vertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters, byte[] userPayload) {
     super(tezConf, vertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 26e6978..b987bfe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.tez.engine.newapi.impl;
 
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezProcessorContext;
@@ -31,7 +31,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
 
   private final byte[] userPayload;
 
-  public TezProcessorContextImpl(TezConfiguration tezConf, String vertexName,
+  public TezProcessorContextImpl(Configuration tezConf, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload) {
     super(tezConf, vertexName, taskAttemptID, counters);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 2feeb85..862b6d2 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -18,19 +18,21 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.TezTaskContext;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
-  private final TezConfiguration tezConf;
+  private final Configuration tezConf;
   private final String vertexName;
   private final TezTaskAttemptID taskAttemptID;
   private final TezCounters counters;
 
-  public TezTaskContextImpl(TezConfiguration tezConf,
+  @Private
+  public TezTaskContextImpl(Configuration tezConf,
       String vertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters) {
     this.tezConf = tezConf;
@@ -40,11 +42,6 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   }
 
   @Override
-  public TezConfiguration getConfiguration() {
-    return tezConf;
-  }
-
-  @Override
   public int getTaskIndex() {
     return taskAttemptID.getTaskID().getId();
   }
@@ -64,4 +61,5 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     return counters;
   }
 
+  // TODO Add a method to get working dir
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
new file mode 100644
index 0000000..a31346c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.Input;
+import org.apache.tez.engine.newapi.LogicalIOProcessor;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.Output;
+import org.apache.tez.engine.newapi.Processor;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
+import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
+import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.newapi.rpc.impl.InputSpec;
+import org.apache.tez.engine.newapi.rpc.impl.OutputSpec;
+import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
+
+import com.google.common.base.Preconditions;
+
+@Private
+public class LogicalIOProcessorRuntimeTask {
+
+  private enum State {
+    NEW, INITED, RUNNING, CLOSED
+  }
+  
+  private static final Log LOG = LogFactory
+      .getLog(LogicalIOProcessorRuntimeTask.class);
+
+  private final TaskSpec taskSpec;
+  private final Configuration tezConf;
+
+  private final List<InputSpec> inputSpecs;
+  private final List<LogicalInput> inputs;
+
+  private final List<OutputSpec> outputSpecs;
+  private final List<LogicalOutput> outputs;
+
+  private final ProcessorDescriptor processorDescriptor;
+  private final LogicalIOProcessor processor;
+  
+  private final TezCounters tezCounters;
+
+  private State state;
+  
+  private Map<String, LogicalInput> inputMap;
+  private Map<String, LogicalOutput> outputMap;
+
+  private Map<String, List<Event>> initInputEventMap;
+  private Map<String, List<Event>> initOutputEventMap;
+
+  private Map<String, List<Event>> closeInputEventMap;
+  private Map<String, List<Event>> closeOutputEventMap;
+
+  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf) {
+    LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+        + taskSpec);
+    this.taskSpec = taskSpec;
+    this.tezConf = tezConf;
+    this.inputSpecs = taskSpec.getInputs();
+    this.inputs = createInputs(inputSpecs);
+    this.outputSpecs = taskSpec.getOutputs();
+    this.outputs = createOutputs(outputSpecs);
+    this.processorDescriptor = taskSpec.getProcessorDescriptor();
+    this.processor = createProcessor(processorDescriptor);
+    this.tezCounters = new TezCounters();
+    this.state = State.NEW;
+  }
+
+  public void initialize() throws IOException {
+    Preconditions.checkState(this.state == State.NEW, "Already initialized");
+    this.state = State.INITED;
+    inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
+    outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
+
+    initInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
+    initOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+
+    // TODO Maybe close initialized inputs / outputs in case of failure to
+    // initialize.
+    // Initialize all inputs. TODO: Multi-threaded at some point.
+    for (int i = 0; i < inputs.size(); i++) {
+      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+      List<Event> initInputEvents = initializeInput(inputs.get(i),
+          inputSpecs.get(i));
+      // TODO Add null/event list checking here or in the actual executor.
+      initInputEventMap.put(srcVertexName, initInputEvents);
+      inputMap.put(srcVertexName, inputs.get(i));
+    }
+
+    // Initialize all outputs. TODO: Multi-threaded at some point.
+    for (int i = 0; i < outputs.size(); i++) {
+      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+      List<Event> initOutputEvents = initializeOutput(outputs.get(i),
+          outputSpecs.get(i));
+      // TODO Add null/event list checking here or in the actual executor.
+      initOutputEventMap.put(destVertexName, initOutputEvents);
+      outputMap.put(destVertexName, outputs.get(i));
+    }
+
+    // Initialize processor.
+    initializeLogicalIOProcessor();
+  }
+
+  public Map<String, List<Event>> getInputInitEvents() {
+    Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
+    return initInputEventMap;
+  }
+
+  public Map<String, List<Event>> getOutputInitEvents() {
+    Preconditions.checkState(this.state != State.NEW, "Not initialized yet");
+    return initOutputEventMap;
+  }
+
+  public void run() throws IOException {
+    Preconditions.checkState(this.state == State.INITED,
+        "Can only run while in INITED state. Current: " + this.state);
+    this.state = State.RUNNING;
+    LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
+    lioProcessor.run(inputMap, outputMap);
+  }
+
+  public void close() throws IOException {
+    Preconditions.checkState(this.state == State.RUNNING,
+        "Can only run while in RUNNING state. Current: " + this.state);
+    this.state=State.CLOSED;
+    closeInputEventMap = new LinkedHashMap<String, List<Event>>(inputs.size());
+    closeOutputEventMap = new LinkedHashMap<String, List<Event>>(outputs.size());
+
+    // Close the Inputs.
+    for (int i = 0; i < inputs.size(); i++) {
+      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+      List<Event> closeInputEvents = inputs.get(i).close();
+      closeInputEventMap.put(srcVertexName, closeInputEvents);
+    }
+
+    // Close the Processor.
+    processor.close();
+
+    // Close the Outputs.
+    for (int i = 0; i < outputs.size(); i++) {
+      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+      List<Event> closeOutputEvents = outputs.get(i).close();
+      closeOutputEventMap.put(destVertexName, closeOutputEvents);
+    }
+  }
+  
+  public Map<String, List<Event>> getInputCloseEvents() {
+    Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
+    return closeInputEventMap;
+  }
+  
+  public Map<String, List<Event>> getOutputCloseEvents() {
+    Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
+    return closeOutputEventMap;
+  }
+
+  private List<Event> initializeInput(Input input, InputSpec inputSpec)
+      throws IOException {
+    TezInputContext tezInputContext = createInputContext(inputSpec);
+    if (input instanceof LogicalInput) {
+      ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+          .getPhysicalEdgeCount());
+    }
+    return input.initialize(tezInputContext);
+  }
+
+  private List<Event> initializeOutput(Output output, OutputSpec outputSpec)
+      throws IOException {
+    TezOutputContext tezOutputContext = createOutputContext(outputSpec);
+    if (output instanceof LogicalOutput) {
+      ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+          .getPhysicalEdgeCount());
+    }
+    return output.initialize(tezOutputContext);
+  }
+
+  private void initializeLogicalIOProcessor() throws IOException {
+    TezProcessorContext processorContext = createProcessorContext();
+    processor.initialize(processorContext);
+  }
+
+  private TezInputContext createInputContext(InputSpec inputSpec) {
+    TezInputContext inputContext = new TezInputContextImpl(tezConf,
+        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+        inputSpec.getInputDescriptor().getUserPayload());
+    return inputContext;
+  }
+
+  private TezOutputContext createOutputContext(OutputSpec outputSpec) {
+    TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
+        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+        outputSpec.getOutputDescriptor().getUserPayload());
+    return outputContext;
+  }
+
+  private TezProcessorContext createProcessorContext() {
+    TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
+        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
+        processorDescriptor.getUserPayload());
+    return processorContext;
+  }
+
+  private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
+    List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
+    for (InputSpec inputSpec : inputSpecs) {
+      Input input = RuntimeUtils.createClazzInstance(inputSpec
+          .getInputDescriptor().getClassName());
+
+      if (input instanceof LogicalInput) {
+        inputs.add((LogicalInput) input);
+      } else {
+        throw new TezUncheckedException(
+            input.getClass().getName()
+                + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by a LogicalIOProcessor.");
+      }
+
+    }
+    return inputs;
+  }
+
+  private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
+    List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
+        outputSpecs.size());
+    for (OutputSpec outputSpec : outputSpecs) {
+      Output output = RuntimeUtils.createClazzInstance(outputSpec
+          .getOutputDescriptor().getClassName());
+      if (output instanceof LogicalOutput) {
+        outputs.add((LogicalOutput) output);
+      } else {
+        throw new TezUncheckedException(
+            output.getClass().getName()
+                + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by a LogicalIOProcessor.");
+      }
+    }
+    return outputs;
+  }
+
+  private LogicalIOProcessor createProcessor(
+      ProcessorDescriptor processorDescriptor) {
+    Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+        .getClassName());
+    if (!(processor instanceof LogicalIOProcessor)) {
+      throw new TezUncheckedException(
+          processor.getClass().getName()
+              + " is not a sub-type of LogicalIOProcessor. Only LogicalIOProcessor sub-types supported at the moment");
+    }
+    return (LogicalIOProcessor) processor;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52ff7aca/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
new file mode 100644
index 0000000..20a029e
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newruntime;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class RuntimeUtils {
+
+  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+  private static Class<?> getClazz(String className) {
+    Class<?> clazz = CLAZZ_CACHE.get(className);
+    if (clazz == null) {
+      try {
+        clazz = Class.forName(className);
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Unable to load class: " + className, e);
+      }
+    }
+    return clazz;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz) {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    }
+    return instance;
+  }
+
+  public static <T> T createClazzInstance(String className) {
+    Class<?> clazz = getClazz(className);
+    @SuppressWarnings("unchecked")
+    T instance = (T) getNewInstance(clazz);
+    return instance;
+  }
+}