You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [16/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class ReduceProcessor
+extends MRTask
+implements Processor {
+
+  private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
+  
+  private Progress sortPhase;
+  private Progress reducePhase;
+
+  private Counter reduceInputKeyCounter;
+  private Counter reduceInputValueCounter;
+  private int numMapTasks;
+
+  @Inject
+  public ReduceProcessor(
+      @Assisted TezTask context
+      ) {
+    super(context);
+    MRTaskContext mrTaskContext = (MRTaskContext)context;
+    this.numMapTasks = mrTaskContext.getNumMapTasks();
+  }
+  
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    super.initialize(conf, master);
+    
+  }
+
+  @Override
+  public void process(Input in, Output out)
+      throws IOException, InterruptedException {
+    MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+    boolean useNewApi = jobConf.getUseNewMapper();
+    initTask(jobConf, getJobID(), reporter, useNewApi);
+
+    if (in instanceof SimpleInput) {
+      ((SimpleInput)in).setTask(this);
+    } else if (in instanceof ShuffledMergedInput) {
+      ((ShuffledMergedInput)in).setTask(this);
+    }
+    
+    if (out instanceof SimpleOutput) {
+      ((SimpleOutput)out).setTask(this);
+    }
+    
+    in.initialize(jobConf, getTaskReporter());
+    out.initialize(jobConf, getTaskReporter());
+
+    sortPhase  = getProgress().addPhase("sort");
+    reducePhase = getProgress().addPhase("reduce");
+    sortPhase.complete();                         // sort is complete
+    setPhase(TezTaskStatus.Phase.REDUCE); 
+
+    this.statusUpdate();
+    
+    Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
+    Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+    RawComparator comparator = 
+        ConfigUtils.getOutputValueGroupingComparator(jobConf);
+
+    reduceInputKeyCounter = 
+        reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    reduceInputValueCounter = 
+        reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+        
+    // Sanity check
+    if (!(in instanceof ShuffledMergedInput)) {
+      throw new IOException("Illegal input to reduce: " + in.getClass());
+    }
+    ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+
+    if (useNewApi) {
+      try {
+        runNewReducer(
+            jobConf, 
+            (TezTaskUmbilicalProtocol)getUmbilical(), reporter, 
+            shuffleInput, comparator,  keyClass, valueClass, 
+            (SimpleOutput) out);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    } else {
+      runOldReducer(
+          jobConf, (TezTaskUmbilicalProtocol)getUmbilical(), reporter, 
+          shuffleInput, comparator, keyClass, valueClass, (SimpleOutput) out);
+    }
+    
+    done(out.getOutputContext(), reporter);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  void runOldReducer(JobConf job,
+      TezTaskUmbilicalProtocol umbilical,
+      final MRTaskReporter reporter,
+      ShuffledMergedInput input,
+      RawComparator comparator,
+      Class keyClass,
+      Class valueClass,
+      final SimpleOutput output) throws IOException, InterruptedException {
+    
+    Reducer reducer = 
+        ReflectionUtils.newInstance(job.getReducerClass(), job);
+    
+    // make output collector
+
+    OutputCollector collector = 
+        new OutputCollector() {
+      public void collect(Object key, Object value)
+          throws IOException {
+        try {
+          output.write(key, value);
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+      }
+    };
+
+    // apply reduce function
+    try {
+      ReduceValuesIterator values = 
+          new ReduceValuesIterator(
+              input, 
+              job.getOutputValueGroupingComparator(), keyClass, valueClass, 
+              job, reporter, reduceInputValueCounter, reducePhase);
+      
+      values.informReduceProgress();
+      while (values.more()) {
+        reduceInputKeyCounter.increment(1);
+        reducer.reduce(values.getKey(), values, collector, reporter);
+        values.nextKey();
+        values.informReduceProgress();
+      }
+
+      //Clean up: repeated in catch block below
+      reducer.close();
+      output.close();
+      //End of clean up.
+    } catch (IOException ioe) {
+      try {
+        reducer.close();
+      } catch (IOException ignored) {}
+
+      try {
+        output.close();
+      } catch (IOException ignored) {}
+
+      throw ioe;
+    }
+  }
+  
+  private static class ReduceValuesIterator<KEY,VALUE> 
+  extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+    private Counter reduceInputValueCounter;
+    private Progress reducePhase;
+
+    public ReduceValuesIterator (ShuffledMergedInput in,
+        RawComparator<KEY> comparator, 
+        Class<KEY> keyClass,
+        Class<VALUE> valClass,
+        Configuration conf, Progressable reporter,
+        Counter reduceInputValueCounter,
+        Progress reducePhase)
+            throws IOException {
+      super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+      this.reduceInputValueCounter = reduceInputValueCounter;
+      this.reducePhase = reducePhase;
+    }
+
+    @Override
+    public VALUE next() {
+      reduceInputValueCounter.increment(1);
+      return moveToNext();
+    }
+
+    protected VALUE moveToNext() {
+      return super.next();
+    }
+
+    public void informReduceProgress() {
+      reducePhase.set(super.in.getProgress().getProgress()); // update progress
+      reporter.progress();
+    }
+  }
+
+  void runNewReducer(JobConf job,
+      final TezTaskUmbilicalProtocol umbilical,
+      final MRTaskReporter reporter,
+      ShuffledMergedInput input,
+      RawComparator comparator,
+      Class keyClass,
+      Class valueClass,
+      final SimpleOutput out
+      ) throws IOException,InterruptedException, 
+      ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final TezRawKeyValueIterator rawIter = input.getIterator();
+    TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        reporter.setProgress(rawIter.getProgress().getProgress());
+        return ret;
+      }
+    };
+    
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+    
+    // make a reducer
+    org.apache.hadoop.mapreduce.Reducer reducer =
+        (org.apache.hadoop.mapreduce.Reducer)
+        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+    
+    org.apache.hadoop.mapreduce.RecordWriter trackedRW = 
+        new org.apache.hadoop.mapreduce.RecordWriter() {
+
+          @Override
+          public void write(Object key, Object value) throws IOException,
+              InterruptedException {
+            out.write(key, value);
+          }
+
+          @Override
+          public void close(TaskAttemptContext context) throws IOException,
+              InterruptedException {
+            out.close();
+          }
+        };
+
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = 
+        createReduceContext(
+            reducer, job, getTaskAttemptId(),
+            rIter, reduceInputKeyCounter, 
+            reduceInputValueCounter, 
+            trackedRW,
+            committer,
+            reporter, comparator, keyClass,
+            valueClass);
+    reducer.run(reducerContext);
+    trackedRW.close(reducerContext);
+  }
+
+  @Override
+  public void localizeConfiguration(JobConf jobConf) 
+      throws IOException, InterruptedException {
+    super.localizeConfiguration(jobConf);
+    jobConf.setBoolean(JobContext.TASK_ISMAP, false);
+    jobConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, numMapTasks); 
+  }
+
+  @Override
+  public TezCounter getOutputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+  }
+
+  @Override
+  public TezCounter getInputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class FinalTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, ShuffledMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, OnFileSortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithInMemSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+  
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, InMemorySortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithLocalSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+  
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, LocalOnFileSorterOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+public class IntermediateTask extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, ShuffledMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, OnFileSortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class LocalFinalTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, LocalMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class MapOnlyTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.records.TezTaskAttemptID;
+
+// TODO XXX Writable serialization for now.
+public class MRTaskContext extends TezTask {  
+  private String splitLocation;
+  private long splitOffset;
+  private int numMapTasks;
+
+  // Required for serialization.
+  public MRTaskContext() {
+  }
+  
+  // TODO TEZAM5 Remove jobToken from the consturctor.
+  public MRTaskContext(TezTaskAttemptID taskAttemptId, String user,
+      String jobName, String moduleClassName, SecretKey jobToken,
+      String splitLocation, long splitOffset, int numMapTasks) {
+    super(taskAttemptId, user, jobName, moduleClassName);
+    this.splitLocation = splitLocation;
+    this.splitOffset = splitOffset;
+    this.numMapTasks = numMapTasks;
+  }
+
+  public String getSplitLocation() {
+    return splitLocation;
+  }
+
+  public long getSplitOffset() {
+    return splitOffset;
+  }
+  
+  public int getNumMapTasks() {
+    return this.numMapTasks;
+  }
+  
+  public void setNumMapTasks(int numMapTasks) {
+    this.numMapTasks = numMapTasks;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    // TODO VERIFY That split location needs to be serialized.
+    WritableUtils.writeString(out, splitLocation);
+    out.writeLong(splitOffset);
+    out.writeInt(numMapTasks);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    splitLocation = WritableUtils.readString(in);
+    splitOffset = in.readLong();
+    numMapTasks = in.readInt();
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.tez.common.Constants;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  private static final String JOB_OUTPUT_DIR = "output";
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING +
+        Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,459 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.LogManager;
+import org.apache.tez.api.Task;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * The main() for TEZ MapReduce task processes.
+ */
+public class YarnTezChild {
+
+  private static final Log LOG = LogFactory.getLog(YarnTezChild.class);
+
+  public static void main(String[] args) throws Throwable {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    LOG.debug("Child starting");
+
+    DeprecatedKeys.init();
+    
+    final JobConf defaultConf = new JobConf();
+    defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+    UserGroupInformation.setConfiguration(defaultConf);
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final InetSocketAddress address =
+        NetUtils.createSocketAddrForHost(host, port);
+    
+    final TezJobID jobID = IDConverter.fromMRJobId(JobID.forName(args[2]));
+    
+    final MRTaskType taskType = MRTaskType.valueOf(args[3]);
+
+    final ContainerId containerId = ConverterUtils.toContainerId(args[4]);
+
+    // initialize metrics
+    DefaultMetricsSystem.initialize(
+        StringUtils.camelize(taskType.name() +"Task"));
+
+    // Security framework already loaded the tokens into current ugi
+    Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    LOG.info("Executing with tokens:");
+    for (Token<?> token: credentials.getAllTokens()) {
+      LOG.info(token);
+    }
+
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner =
+      UserGroupInformation.createRemoteUser(jobID.toString());
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+    SecurityUtil.setTokenService(jt, address);
+    taskOwner.addToken(jt);
+    final TezTaskUmbilicalProtocol umbilical =
+      taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+      @Override
+      public TezTaskUmbilicalProtocol run() throws Exception {
+        return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class,
+            TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+      }
+    });
+
+    // report non-pid to application master
+    String pid = System.getenv().get("JVM_PID");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PID, containerId: " + pid + ", " + containerId);
+    }
+    MRTaskContext taskContext = null;
+    ContainerTask containerTask = null;
+    UserGroupInformation childUGI = null;
+    TezTaskAttemptID taskAttemptId = null;
+    MRTask task = null;
+    ContainerContext containerContext = new ContainerContext(containerId, pid);
+    
+    try {
+      while (true) {
+        // poll for new task
+        for (int idle = 0; null == containerTask; ++idle) {
+          long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
+          LOG.info("Sleeping for " + sleepTimeMilliSecs
+              + "ms before retrying again. Got null now.");
+          MILLISECONDS.sleep(sleepTimeMilliSecs);
+          containerTask = umbilical.getTask(containerContext);
+        }
+        LOG.info("TaskInfo: shouldDie: "
+            + containerTask.shouldDie()
+            + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
+                + containerTask.getMrTaskContext().getTaskAttemptId()));
+
+        if (containerTask.shouldDie()) {
+          return;
+        }
+        taskContext = containerTask.getMrTaskContext();
+
+        taskAttemptId = taskContext.getTaskAttemptId();
+
+        final Task t = createAndConfigureTezTask(taskContext, umbilical,
+            credentials, jt);
+        task = (MRTask) t.getProcessor();
+        final JobConf job = task.getConf();
+
+        // Initiate Java VM metrics
+        JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
+        childUGI = UserGroupInformation.createRemoteUser(System
+            .getenv(ApplicationConstants.Environment.USER.toString()));
+        // Add tokens to new user so that it may execute its task correctly.
+        childUGI.addCredentials(credentials);
+
+        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            runTezTask(t, umbilical, job); // run the task
+            return null;
+          }
+        });
+        FileSystem.closeAllForUGI(childUGI);
+        containerTask = null;
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskAttemptId, e.getMessage());
+    } catch (Exception exception) {
+      LOG.warn("Exception running child : "
+          + StringUtils.stringifyException(exception));
+      try {
+        if (task != null) {
+          // do cleanup for the task
+          if (childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final MRTask taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
+        }
+      } catch (Exception e) {
+        LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+      }
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      exception.printStackTrace(new PrintStream(baos));
+      if (taskAttemptId != null) {
+        umbilical.fatalError(taskAttemptId, baos.toString());
+      }
+    } catch (Throwable throwable) {
+      LOG.fatal("Error running child : "
+    	        + StringUtils.stringifyException(throwable));
+      if (taskAttemptId != null) {
+        Throwable tCause = throwable.getCause();
+        String cause = tCause == null
+                                 ? throwable.getMessage()
+                                 : StringUtils.stringifyException(tCause);
+        umbilical.fatalError(taskAttemptId, cause);
+      }
+    } finally {
+      RPC.stopProxy(umbilical);
+      DefaultMetricsSystem.shutdown();
+      // Shutting down log4j of the child-vm...
+      // This assumes that on return from Task.run()
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+
+  /**
+   * Configure mapred-local dirs. This config is used by the task for finding
+   * out an output directory.
+   * @throws IOException 
+   */
+  private static void configureLocalDirs(MRTask task, JobConf job) throws IOException {
+    String[] localSysDirs = StringUtils.getTrimmedStrings(
+        System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+    job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
+    LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
+        job.get(TezJobConfig.LOCAL_DIR));
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    Path workDir = null;
+    // First, try to find the JOB_LOCAL_DIR on this host.
+    try {
+      workDir = lDirAlloc.getLocalPathToRead("work", job);
+    } catch (DiskErrorException e) {
+      // DiskErrorException means dir not found. If not found, it will
+      // be created below.
+    }
+    if (workDir == null) {
+      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+      workDir = lDirAlloc.getLocalPathForWrite("work", job);
+      FileSystem lfs = FileSystem.getLocal(job).getRaw();
+      boolean madeDir = false;
+      try {
+        madeDir = lfs.mkdirs(workDir);
+      } catch (FileAlreadyExistsException e) {
+        // Since all tasks will be running in their own JVM, the race condition
+        // exists where multiple tasks could be trying to create this directory
+        // at the same time. If this task loses the race, it's okay because
+        // the directory already exists.
+        madeDir = true;
+        workDir = lDirAlloc.getLocalPathToRead("work", job);
+      }
+      if (!madeDir) {
+          throw new IOException("Mkdirs failed to create "
+              + workDir.toString());
+      }
+    }
+    job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+  }
+
+  private static JobConf configureTask(MRTask task, Credentials credentials,
+      Token<JobTokenIdentifier> jt) throws IOException, InterruptedException {
+    JobConf job = task.getConf();
+    
+    String appAttemptIdEnv = System
+        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+    LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+    // Set it in conf, so as to be able to be used the the OutputCommitter.
+    job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+        .parseInt(appAttemptIdEnv));
+
+    // set tcp nodelay
+    job.setBoolean("ipc.client.tcpnodelay", true);
+    job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+        YarnOutputFiles.class, MapOutputFile.class);
+    // set the jobTokenFile into task
+    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+
+    task.setJobTokenSecret(sk);
+//    task.setJobTokenSecret(
+//        JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+    // setup the child's MRConfig.LOCAL_DIR.
+    configureLocalDirs(task, job);
+
+    // setup the child's attempt directories
+    // Do the task-type specific localization
+    task.localizeConfiguration(job);
+
+    // Set up the DistributedCache related configs
+    setupDistributedCacheConfig(job);
+
+    // Overwrite the localized task jobconf which is linked to in the current
+    // work-dir.
+    Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
+    writeLocalJobFile(localTaskFile, job);
+    task.setConf(job);
+    return job;
+  }
+
+  /**
+   * Set up the DistributedCache related configs to make
+   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * and
+   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * working.
+   * @param job
+   * @throws IOException
+   */
+  private static void setupDistributedCacheConfig(final JobConf job)
+      throws IOException {
+
+    String localWorkDir = System.getenv("PWD");
+    //        ^ ^ all symlinks are created in the current work-dir
+
+    // Update the configuration object with localized archives.
+    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+    if (cacheArchives != null) {
+      List<String> localArchives = new ArrayList<String>();
+      for (int i = 0; i < cacheArchives.length; ++i) {
+        URI u = cacheArchives[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localArchives.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+            .arrayToString(localArchives.toArray(new String[localArchives
+                .size()])));
+      }
+    }
+
+    // Update the configuration object with localized files.
+    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+    if (cacheFiles != null) {
+      List<String> localFiles = new ArrayList<String>();
+      for (int i = 0; i < cacheFiles.length; ++i) {
+        URI u = cacheFiles[i];
+        Path p = new Path(u);
+        Path name =
+            new Path((null == u.getFragment()) ? p.getName()
+                : u.getFragment());
+        String linkName = name.toUri().getPath();
+        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+      }
+      if (!localFiles.isEmpty()) {
+        job.set(MRJobConfig.CACHE_LOCALFILES,
+            StringUtils.arrayToString(localFiles
+                .toArray(new String[localFiles.size()])));
+      }
+    }
+  }
+
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  private static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+  private static Task createAndConfigureTezTask(
+      TezTask taskContext,
+      TezTaskUmbilicalProtocol master, 
+      Credentials credentials, Token<JobTokenIdentifier> jt) 
+      throws IOException, InterruptedException {
+    final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
+    job.setCredentials(credentials);
+    
+    // Create the appropriate guice task-module
+    AbstractModule taskModule = null;
+    LOG.info("Using Module: " + taskContext.getTaskModuleClassName());
+    try {
+      Class<?> moduleClazz = Class
+          .forName(taskContext.getTaskModuleClassName());
+      if (AbstractModule.class.isAssignableFrom(moduleClazz)) {
+        taskModule = (AbstractModule) ReflectionUtils.newInstance(moduleClazz,
+            job);
+      } else {
+        throw new YarnException("Module class: " + moduleClazz.getName()
+            + " should be an instance of "
+            + AbstractModule.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnException("Unable to load moduleClass: "
+          + taskContext.getTaskModuleClassName(), e);
+    }
+
+    // Use the injector to create & bind input, processor, output & task
+    Injector injector = Guice.createInjector(taskModule);
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, master);
+    
+    MRTask task = (MRTask)t.getProcessor();
+    configureTask(task, credentials, jt);
+    
+    return t;
+  }
+  
+  private static void runTezTask(
+      Task t, TezTaskUmbilicalProtocol master, JobConf job) 
+  throws IOException, InterruptedException {
+    // use job-specified working directory
+    FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+    
+    // Run!
+    t.run();
+    t.close();
+  }
+}

Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.task.FinalTask;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.IntermediateTask;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class TestTaskModules {
+  
+  private static final Log LOG = LogFactory.getLog(TestTaskModules.class);
+
+  MRTaskContext taskContext;
+  JobConf job;
+  
+  @Before
+  public void setUp() {
+    taskContext = 
+        new MRTaskContext(
+            TezTestUtils.getMockTaskAttemptId(0, 0, 0, MRTaskType.REDUCE),
+            "tez", "tez", TestInitialModule.class.getName(), null, "", 0, 0);
+    job = new JobConf();
+  }
+  
+  @Test
+  public void testInitialTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestInitialModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, null);
+  }
+
+  @Test
+  public void testIntermediateTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestIntermediateModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, null);
+  }
+
+  @Test
+  public void testFinalTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestFinalModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task task = factory.createTask(taskContext);
+    LOG.info("task = " + task.getClass());
+    task.initialize(job, null);
+  }
+
+  static class TestTask implements Task {
+
+    private final Input in;
+    private final Output out;
+    private final Processor processor;
+    
+    @Inject
+    public TestTask(
+        @Assisted Processor processor, 
+        @Assisted Input in, 
+        @Assisted Output out) {
+      this.in = in;
+      this.processor = processor;
+      this.out = out;
+    }
+    
+    @Override
+    public void initialize(Configuration conf, Master master)
+        throws IOException, InterruptedException {
+      LOG.info("in = " + in.getClass());
+      LOG.info("processor = " + processor.getClass());
+      LOG.info("out = " + out.getClass());
+    }
+
+    @Override
+    public Input getInput() {
+      return in;
+    }
+
+    @Override
+    public Output getOutput() {
+      return out;
+    }
+
+    @Override
+    public Processor getProcessor() {
+      return processor;
+    }
+
+    @Override
+    public void run() throws IOException, InterruptedException {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void close() throws IOException, InterruptedException {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+  
+  static class TestInitialModule extends InitialTask {
+
+    @Override
+    protected void configure() {    
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, SimpleInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, OnFileSortedOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, MapProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+      
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+
+  static class TestIntermediateModule extends IntermediateTask {
+
+    @Override
+    protected void configure() {
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, ShuffledMergedInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, OnFileSortedOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, ReduceProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+
+  static class TestFinalModule extends FinalTask {
+
+    @Override
+    protected void configure() {    
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, SimpleOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, ShuffledMergedInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, ReduceProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+      
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+}

Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,150 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.records.OutputContext;
+
+public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
+
+  private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
+  private ProceedToCompletionResponse proceedToCompletionResponse;
+  private boolean shouldLinger;
+  
+  public TestUmbilicalProtocol() {
+    proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+  }
+  
+  public TestUmbilicalProtocol(boolean shouldLinger) {
+    if (shouldLinger) {
+      proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
+    } else {
+      proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+    }
+    this.shouldLinger = shouldLinger;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      TezJobID jobID, int fromEventIdx, int maxEventsToFetch,
+      TezTaskAttemptID reduce) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ContainerTask getTask(ContainerContext containerContext)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus);
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace)
+      throws IOException {
+    LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace);
+  }
+
+  @Override
+  public boolean ping(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'ping' from " + taskid);
+    return true;
+  }
+
+  @Override
+  public void done(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'done' from " + taskid);
+  }
+
+  @Override
+  public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Got 'commit-pending' from " + taskId + ": status=" + taskStatus);
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'can-commit' from " + taskid);
+    return !shouldLinger;
+  }
+
+  @Override
+  public void shuffleError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void fsError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'fs-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void fatalError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void outputReady(TezTaskAttemptID taskAttemptId,
+      OutputContext outputContext) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ProceedToCompletionResponse proceedToCompletion(
+      TezTaskAttemptID taskAttemptId) throws IOException {
+    return proceedToCompletionResponse;
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskID;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TezTestUtils {
+
+  public static TezTaskAttemptID getMockTaskAttemptId(
+      int jobId, int taskId, int taskAttemptId, MRTaskType type) {
+    TezTaskAttemptID taskAttemptID = mock(TezTaskAttemptID.class);
+    TezTaskID taskID = getMockTaskId(jobId, taskId, type);
+    TezJobID jobID = taskID.getJobID();
+    when(taskAttemptID.getJobID()).thenReturn(jobID);
+    when(taskAttemptID.getTaskID()).thenReturn(taskID);
+    when(taskAttemptID.getId()).thenReturn(taskAttemptId);
+    when(taskAttemptID.getTaskType()).thenReturn(type.toString());
+    when(taskAttemptID.toString()).thenReturn(
+        "attempt_tez_" + Integer.toString(jobId) + "_" + 
+        ((type == MRTaskType.MAP) ? "m" : "r") + "_" + 
+        Integer.toString(taskId) + "_" + Integer.toString(taskAttemptId)
+        );
+    return taskAttemptID;
+  }
+  
+  public static TezTaskID getMockTaskId(int jobId, int taskId, MRTaskType type) {
+    TezJobID jobID = getMockJobId(jobId);
+    TezTaskID taskID = mock(TezTaskID.class);
+    when(taskID.getTaskType()).thenReturn(type.toString());
+    when(taskID.getId()).thenReturn(taskId);
+    when(taskID.getJobID()).thenReturn(jobID);
+    return taskID;
+  }
+  
+  public static TezJobID getMockJobId(int jobId) {
+    TezJobID jobID = mock(TezJobID.class);
+    when(jobID.getId()).thenReturn(jobId);
+    when(jobID.getJtIdentifier()).thenReturn("mock");
+    return jobID;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MapUtils {
+
+  private static final Log LOG = LogFactory.getLog(MapUtils.class);
+  
+  private static InputSplit 
+  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
+      throws IOException {
+    FileInputFormat.setInputPaths(job, workDir);
+  
+    
+    // create a file with length entries
+    SequenceFile.Writer writer = 
+        SequenceFile.createWriter(fs, job, file, 
+            LongWritable.class, Text.class);
+    try {
+      Random r = new Random(System.currentTimeMillis());
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 10; i > 0; i--) {
+        key.set(r.nextInt(1000));
+        value.set(Integer.toString(i));
+        writer.append(key, value);
+        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+      }
+    } finally {
+      writer.close();
+    }
+    
+    SequenceFileInputFormat<LongWritable, Text> format = 
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(job, 1);
+    System.err.println("#split = " + splits.length + " ; " +
+        "#locs = " + splits[0].getLocations().length + "; " +
+        "loc = " + splits[0].getLocations()[0] + "; " + 
+        "off = " + splits[0].getLength() + "; " +
+        "file = " + ((FileSplit)splits[0]).getPath());
+    return splits[0];
+  }
+
+  public static Task runMapProcessor(FileSystem fs, Path workDir,
+      JobConf jobConf,
+      int mapId, Path mapInput, AbstractModule taskModule,
+      TezTaskUmbilicalProtocol umbilical) 
+      throws Exception {
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+    MRTaskContext taskContext = 
+        new MRTaskContext(
+            TezTestUtils.getMockTaskAttemptId(0, mapId, 0, MRTaskType.MAP),
+            "tez", "tez", InitialTaskWithLocalSort.class.getName(), null,  
+            ((FileSplit)split).getPath().toString(), 0, 0);
+  
+    Injector injector = Guice.createInjector(taskModule);
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(jobConf, umbilical);
+    SimpleInput real = ((SimpleInput)t.getInput());
+    SimpleInput in = spy(real);
+    doReturn(split).when(in).getOldSplitDetails(any(TaskSplitIndex.class));
+    t.getProcessor().process(in, t.getOutput());
+    return t;
+  }
+
+}