You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/17 09:03:57 UTC

git commit: TEZ-62. Fix and enable tests in TestMapProcessor and TestReduceProcessor. (sseth)

Updated Branches:
  refs/heads/TEZ-1 38b410e31 -> cee79f638


TEZ-62. Fix and enable tests in TestMapProcessor and
TestReduceProcessor. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cee79f63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cee79f63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cee79f63

Branch: refs/heads/TEZ-1
Commit: cee79f6384940ca226cd0c2ef02e024f19e7550a
Parents: 38b410e
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 17 00:03:20 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 17 00:03:20 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezJobConfig.java   |    7 +
 .../org/apache/hadoop/mapred/YarnTezDagChild.java  |   10 +-
 .../apache/tez/common/TezEngineTaskContext.java    |    4 +-
 .../mapreduce/split/SplitMetaInfoReaderTez.java    |   14 ++-
 .../tez/mapreduce/hadoop/TezTypeConverters.java    |    2 +
 .../org/apache/tez/mapreduce/processor/MRTask.java |    8 +-
 .../tez/mapreduce/processor/map/MapProcessor.java  |   14 ++-
 .../tez/mapreduce/TestUmbilicalProtocol.java       |    2 +-
 .../apache/tez/mapreduce/processor/MapUtils.java   |   85 ++++++++----
 .../mapreduce/processor/map/TestMapProcessor.java  |   79 +++++++++---
 .../processor/reduce/TestReduceProcessor.java      |  104 +++++++++++----
 11 files changed, 236 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index e867458..4a7abea 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -68,6 +68,13 @@ public class TezJobConfig {
   public static final String DEFAULT_LOCAL_DIR = "/tmp";
 
   /**
+   * The directory which contains the localized files for this task.
+   */
+  @Private
+  public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
+  public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
+  
+  /**
    * 
    */
   public static final String TEZ_ENGINE_TASK_INDEGREE = 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f140c3a..2295b6a 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -249,12 +249,16 @@ public class YarnTezDagChild {
    * out an output directory.
    * @throws IOException 
    */
-  private static void configureLocalDirs(MRTask task, JobConf job) throws IOException {
+  private static void configureLocalDirs(JobConf job) throws IOException {
     String[] localSysDirs = StringUtils.getTrimmedStrings(
         System.getenv(Environment.LOCAL_DIRS.name()));
     job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
+    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+        System.getenv(Environment.PWD.name()));
     LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
         job.get(TezJobConfig.LOCAL_DIR));
+    LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
+        + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
     LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
     Path workDir = null;
     // First, try to find the JOB_LOCAL_DIR on this host.
@@ -285,7 +289,7 @@ public class YarnTezDagChild {
       }
     }
     // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc.
-    job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+    job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
   }
 
   private static JobConf configureTask(MRTask task, Credentials credentials,
@@ -308,7 +312,7 @@ public class YarnTezDagChild {
 //        JobTokenSecretManager.createSecretKey(jt.getPassword()));
 
     // setup the child's MRConfig.LOCAL_DIR.
-    configureLocalDirs(task, job);
+    configureLocalDirs(job);
 
     // setup the child's attempt directories
     // Do the task-type specific localization

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
index 350702a..5823de6 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
@@ -39,7 +39,7 @@ public class TezEngineTaskContext extends TezTaskContext {
   }
 
   public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
-      String jobName, String vertexName, String moduleClassName,
+      String jobName, String vertexName, String processorName,
       List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
     super(taskAttemptID, user, jobName, vertexName);
     this.inputSpecList = inputSpecList;
@@ -52,7 +52,7 @@ public class TezEngineTaskContext extends TezTaskContext {
     }
     this.inputSpecList = inputSpecList;
     this.outputSpecList = outputSpecList;
-    this.processorName = moduleClassName;
+    this.processorName = processorName;
   }
 
   public String getProcessorName() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index de8d972..72246c0 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 /**
@@ -44,6 +45,9 @@ public class SplitMetaInfoReaderTez {
 
   public static final Log LOG = LogFactory.getLog(SplitMetaInfoReaderTez.class);
 
+  public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
+  public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
+
   // Forked from the MR variant so that the metaInfo file as well as the split
   // file can be read from local fs - relying on these files being localized.
   public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
@@ -53,9 +57,11 @@ public class SplitMetaInfoReaderTez {
         MRJobConfig.SPLIT_METAINFO_MAXSIZE,
         MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
 
-    Path metaSplitFile = new Path(MRJobConfig.JOB_SPLIT_METAINFO);
+    Path metaSplitFile = new Path(
+        conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+        MRJobConfig.JOB_SPLIT_METAINFO);
     String jobSplitFile = MRJobConfig.JOB_SPLIT;
-
+    
     File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
     LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: "
         + file.getAbsolutePath() + ", defaultFS from conf: "
@@ -83,12 +89,12 @@ public class SplitMetaInfoReaderTez {
       JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
       splitMetaInfo.readFields(in);
       JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
-          jobSplitFile, splitMetaInfo.getStartOffset());
+          new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), jobSplitFile)
+              .toUri().toString(), splitMetaInfo.getStartOffset());
       allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
           splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
     }
     in.close();
     return allSplitMetaInfo;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
index 1fc71b0..768d347 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
@@ -34,6 +34,8 @@ import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 public class TezTypeConverters {
 
+  // TODO Remove unused methods
+  
   // Tez objects will be imported. Others will use the fully qualified name when
   // required.
   // All public methods named toYarn / toTez / toMapReduce

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index d7d721f..276294c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -149,7 +149,7 @@ extends RunningTaskContext {
     
     jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID, 
         taskAttemptId.toString());
-    
+
     initResourceCalculatorPlugin();
     
     LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
@@ -695,8 +695,10 @@ extends RunningTaskContext {
 
   public void localizeConfiguration(JobConf jobConf) 
       throws IOException, InterruptedException {
-    jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); 
-    jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.set(JobContext.TASK_ID, IDConverter
+        .toMRTaskAttemptId(taskAttemptId).toString());
+    jobConf.set(JobContext.TASK_ATTEMPT_ID,
+        IDConverter.toMRTaskAttemptId(taskAttemptId).toString());
     jobConf.setInt(JobContext.TASK_PARTITION, 
         taskAttemptId.getTaskID().getId());
     jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index a6ab986..3558739 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -69,11 +69,7 @@ public class MapProcessor extends MRTask implements Processor {
   public void initialize(Configuration conf, Master master) throws IOException,
   InterruptedException {
     super.initialize(conf, master);
-    TaskSplitMetaInfo[] allMetaInfo = readSplits();
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
-        .getTaskAttemptId().getTaskID().getId()];
-    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
-        thisTaskMetaInfo.getStartOffset());
+    
   }
 
   @Override
@@ -81,6 +77,14 @@ public class MapProcessor extends MRTask implements Processor {
       final Input[] ins,
       final Output[] outs)
           throws IOException, InterruptedException {
+    
+    // Read split information.
+    TaskSplitMetaInfo[] allMetaInfo = readSplits();
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
+        .getTaskAttemptId().getTaskID().getId()];
+    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+        thisTaskMetaInfo.getStartOffset());
+    
     MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
     boolean useNewApi = jobConf.getUseNewMapper();
     initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index 025bf61..840cb31 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -111,7 +111,7 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
   @Override
   public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
     LOG.info("Got 'can-commit' from " + taskid);
-    return false;
+    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index aec6667..0b6bc5f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -18,40 +18,37 @@
 
 package org.apache.tez.mapreduce.processor;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
 import java.io.IOException;
-import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.api.Task;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
-import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 
 public class MapUtils {
@@ -62,8 +59,7 @@ public class MapUtils {
   createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
       throws IOException {
     FileInputFormat.setInputPaths(job, workDir);
-  
-    
+
     // create a file with length entries
     SequenceFile.Writer writer = 
         SequenceFile.createWriter(fs, job, file, 
@@ -92,30 +88,59 @@ public class MapUtils {
         "file = " + ((FileSplit)splits[0]).getPath());
     return splits[0];
   }
+  
+  final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+      .createImmutable((short) 0644); // rw-r--r--
+
+  // Will write files to PWD, from where they are read.
+  
+  private static void writeSplitFiles(FileSystem fs, JobConf conf,
+      InputSplit split) throws IOException {
+    Path jobSplitFile = new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+        TezJobConfig.DEFAULT_TASK_LOCAL_RESOURCE_DIR), MRJobConfig.JOB_SPLIT);
+    FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+
+    long offset = out.getPos();
+    Text.writeString(out, split.getClass().getName());
+    split.write(out);
+    out.close();
+
+    String[] locations = split.getLocations();
+
+    SplitMetaInfo info = null;
+    info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+
+    Path jobSplitMetaInfoFile = new Path(
+        conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+        MRJobConfig.JOB_SPLIT_METAINFO);
+
+    FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
+        new FsPermission(JOB_FILE_PERMISSION));
+    outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
+    WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
+    info.write(outMeta);
+    outMeta.close();
+  }
 
   public static Task runMapProcessor(FileSystem fs, Path workDir,
-      JobConf jobConf,
-      int mapId, Path mapInput,
+      JobConf jobConf, int mapId, Path mapInput,
       TezTaskUmbilicalProtocol umbilical,
-      Class<?> outputClazz) throws Exception {
+      String vertexName, List<InputSpec> inputSpecs,
+      List<OutputSpec> outputSpecs) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
     InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
-    TezEngineTaskContext taskContext = 
-        new TezEngineTaskContext(
-        TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "tez",
-        "tez", "TODO_vertexName", MapProcessor.class.getName(),
-        Collections.singletonList(new InputSpec("srcVertex", 0,
-            SimpleInput.class.getName())),
-        Collections.singletonList(new OutputSpec("targetVertex", 0,
-            outputClazz.getName())));
+
+    writeSplitFiles(fs, jobConf, split);
+    TezEngineTaskContext taskContext = new TezEngineTaskContext(
+        TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser",
+        "testJob", vertexName, MapProcessor.class.getName(),
+        inputSpecs, outputSpecs);
 
     Task t = RuntimeUtils.createRuntimeTask(taskContext);
     t.initialize(jobConf, umbilical);
-    SimpleInput[] real = ((SimpleInput[])t.getInputs());
-    SimpleInput[] inputs = spy(real);
-    doReturn(split).when(inputs[0]).getOldSplitDetails(any(TaskSplitIndex.class));
-    t.getProcessor().process(inputs, t.getOutputs());
+    t.getProcessor().process(t.getInputs(), t.getOutputs());
     return t;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index f09340c..cda15fb 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -19,9 +19,11 @@ package org.apache.tez.mapreduce.processor.map;
 
 
 import java.io.IOException;
+import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -29,6 +31,8 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.Constants;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.sort.impl.IFile;
@@ -37,11 +41,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.output.InMemorySortedOutput;
 import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.TruncatedChannelBuffer;
 import org.jboss.netty.handler.stream.ChunkedStream;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -49,9 +57,7 @@ import org.junit.Test;
 
 public class TestMapProcessor {
   
-  private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
-  
-  JobConf job;
+  private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);  
   
   private static JobConf defaultConf = new JobConf();
   private static FileSystem localFs = null; 
@@ -63,32 +69,51 @@ public class TestMapProcessor {
       throw new RuntimeException("init failure", e);
     }
   }
+  @SuppressWarnings("deprecation")
   private static Path workDir =
     new Path(new Path(System.getProperty("test.build.data", "/tmp")),
              "TestMapProcessor").makeQualified(localFs);
 
   TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-  
-  @Before
-  public void setUp() {
-    job = new JobConf(defaultConf);
+
+  public void setUpJobConf(JobConf job) {
     job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
         TezLocalTaskOutputFiles.class, 
         TezTaskOutput.class);
     job.setNumReduceTasks(1);
-    mapOutputs.setConf(job);
+  }
+  
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    localFs.delete(workDir, true);
   }
   
   @Test
-  @Ignore
   public void testMapProcessor() throws Exception {
-    localFs.delete(workDir, true);
-    MapUtils.runMapProcessor(
-        localFs, workDir, job, 0, new Path(workDir, "map0"), 
-        new TestUmbilicalProtocol(),
-        LocalOnFileSorterOutput.class).close();
+    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    JobConf jobConf = new JobConf(defaultConf);
+    setUpJobConf(jobConf);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+    mapOutputs.setConf(jobConf);
+    
+    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+        vertexName);
+    
+    JobConf job = new JobConf(stageConf);
+    
+    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
+
+    MapUtils.runMapProcessor(localFs, workDir, job, 0,
+        new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,
+        Collections.singletonList(new InputSpec("NullVertex", 0,
+            SimpleInput.class.getName())),
+        Collections.singletonList(new OutputSpec("FakeVertex", 1,
+            LocalOnFileSorterOutput.class.getName()))).close();
 
     Path mapOutputFile = mapOutputs.getInputFile(0);
     LOG.info("mapOutputFile = " + mapOutputFile);
@@ -115,16 +140,34 @@ public class TestMapProcessor {
   @Test
   @Ignore
   public void testMapProcessorWithInMemSort() throws Exception {
+    
+    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    
     final int partitions = 2;
-    job.setNumReduceTasks(partitions);
-    job.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, partitions); 
+    JobConf jobConf = new JobConf(defaultConf);
+    jobConf.setNumReduceTasks(partitions);
+    setUpJobConf(jobConf);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+    mapOutputs.setConf(jobConf);
+    
+    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+        vertexName);
+    
+    JobConf job = new JobConf(stageConf);
 
+    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
     localFs.delete(workDir, true);
     Task t =
         MapUtils.runMapProcessor(
             localFs, workDir, job, 0, new Path(workDir, "map0"), 
-            new TestUmbilicalProtocol(true),
-            InMemorySortedOutput.class);
+            new TestUmbilicalProtocol(true), vertexName, 
+            Collections.singletonList(new InputSpec("NullVertex", 0,
+                SimpleInput.class.getName())),
+            Collections.singletonList(new OutputSpec("FakeVertex", 1,
+                InMemorySortedOutput.class.getName()))
+            );
     InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
     
     verifyInMemSortedStream(outputs[0], 0, 4096);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index afc21a7..69571e1 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -22,40 +22,44 @@ import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.input.LocalMergedInput;
 import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
 import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
 import org.apache.tez.mapreduce.processor.MapUtils;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 
 public class TestReduceProcessor {
   
   private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
-  
-  JobConf job;
-  
+
   private static JobConf defaultConf = new JobConf();
   private static FileSystem localFs = null; 
   static {
@@ -66,13 +70,12 @@ public class TestReduceProcessor {
       throw new RuntimeException("init failure", e);
     }
   }
+  @SuppressWarnings("deprecation")
   private static Path workDir =
     new Path(new Path(System.getProperty("test.build.data", "/tmp")),
              "TestReduceProcessor").makeQualified(localFs);
 
-  @Before
-  public void setUp() {
-    job = new JobConf(defaultConf);
+  public void setUpJobConf(JobConf job) {
     job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
     job.setClass(
         Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
@@ -80,37 +83,84 @@ public class TestReduceProcessor {
         TezTaskOutput.class);
     job.setNumReduceTasks(1);
   }
-  
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    localFs.delete(workDir, true);
+  }
+
   @Test
-  @Ignore
   public void testReduceProcessor() throws Exception {
-    localFs.delete(workDir, true);
 
+    String mapVertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+    JobConf jobConf = new JobConf(defaultConf);
+    setUpJobConf(jobConf);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+    mapOutputs.setConf(jobConf);
+    
+    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+        mapVertexName);
+    
+    JobConf mapConf = new JobConf(mapStageConf);
+    
+    mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+        "localized-resources").toUri().toString());
+    
     // Run a map
-    MapUtils.runMapProcessor(
-        localFs, workDir, job, 0, new Path(workDir, "map0"), 
-        new TestUmbilicalProtocol(),
-        LocalOnFileSorterOutput.class);
+    MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
+        new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
+        Collections.singletonList(new InputSpec("NullVertex", 0,
+            SimpleInput.class.getName())),
+        Collections.singletonList(new OutputSpec("FakeVertex", 1,
+            LocalOnFileSorterOutput.class.getName())));
 
     LOG.info("Starting reduce...");
-    FileOutputFormat.setOutputPath(job, new Path(workDir, "output"));
+    
+    
+    Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+        reduceVertexName);
+    JobConf reduceConf = new JobConf(reduceStageConf);
+    reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
     
     // Now run a reduce
     TezEngineTaskContext taskContext = new TezEngineTaskContext(
-        TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), "tez",
-        "tez", "TODO_vertexName", ReduceProcessor.class.getName(),
-        Collections.singletonList(new InputSpec("TODO_srcVertexName", 1,
+        TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser",
+        "testJob", reduceVertexName, ReduceProcessor.class.getName(),
+        Collections.singletonList(new InputSpec(mapVertexName, 1,
             LocalMergedInput.class.getName())),
-        Collections.singletonList(new OutputSpec("TODO_targetVertexName", 1,
+        Collections.singletonList(new OutputSpec("", 1,
                 SimpleOutput.class.getName())));
-            
-    job.set(JobContext.TASK_ATTEMPT_ID,
-        taskContext.getTaskAttemptId().toString());
+    
     Task t = RuntimeUtils.createRuntimeTask(taskContext);
-    t.initialize(job, new TestUmbilicalProtocol());
+    t.initialize(reduceConf, new TestUmbilicalProtocol());
     t.run();
     t.close();
+    
+    // Can this be done via some utility class ? MapOutputFile derivative, or
+    // instantiating the OutputCommitter
+    Path reduceOutputDir = new Path(new Path(workDir, "output"),
+        "_temporary/0/" + IDConverter
+            .toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
+    Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
+
+    @SuppressWarnings("deprecation")
+    SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf);
+
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    long prev = Long.MIN_VALUE;
+    while (reader.next(key, value)) {
+      if (prev != Long.MIN_VALUE) {
+        Assert.assertTrue(prev < key.get());
+        prev = key.get();
+      }
+    }
 
+    reader.close();
   }
 
 }