You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:18 UTC

[11/50] [abbrv] Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module - tez-engine module (part of TEZ-398). (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index e5cc902..404dd8c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -28,8 +28,8 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 2bc327c..85e6653 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -44,14 +44,13 @@ import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.Task;
-import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
@@ -174,9 +173,9 @@ public class MapUtils {
     outMeta.close();
   }
 
-  public static Task runMapProcessor(FileSystem fs, Path workDir,
+  public static LogicalIOProcessorRuntimeTask runMapProcessor(FileSystem fs, Path workDir,
       JobConf jobConf, int mapId, Path mapInput,
-      TezTaskUmbilicalProtocol umbilical,
+      TezUmbilical umbilical,
       String vertexName, List<InputSpec> inputSpecs,
       List<OutputSpec> outputSpecs) throws Exception {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
@@ -185,14 +184,24 @@ public class MapUtils {
     ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
         MapProcessor.class.getName());
     writeSplitFiles(fs, jobConf, split);
-    TezEngineTaskContext taskContext = new TezEngineTaskContext(
-        TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser",
-        "testJob", vertexName, mapProcessorDesc,
-        inputSpecs, outputSpecs);
-
-    Task t = RuntimeUtils.createRuntimeTask(taskContext);
-    t.initialize(jobConf, null, umbilical);
-    t.getProcessor().process(t.getInputs(), t.getOutputs());
-    return t;
+
+    TaskSpec taskSpec = new TaskSpec(
+        TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0),
+        "testuser",
+        vertexName,
+        mapProcessorDesc,
+        inputSpecs,
+        outputSpecs);
+    
+    // TODO NEWTEZ Fix umbilical access
+    LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+        taskSpec,
+        1,
+        jobConf,
+        umbilical,
+        null);
+    task.initialize();
+    task.run();
+    return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 84f1f81..2ecce8b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -31,24 +31,24 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Task;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
-import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
 import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -75,7 +75,7 @@ public class TestMapProcessor {
   
 
 
-  TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+  
 
   public void setUpJobConf(JobConf job) {
     job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
@@ -97,8 +97,7 @@ public class TestMapProcessor {
     String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-    mapOutputs.setConf(jobConf);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
 
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
@@ -111,20 +110,22 @@ public class TestMapProcessor {
     job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 1);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
-    Task t = MapUtils.runMapProcessor(localFs, workDir, job, 0,
-        new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,
-        Collections.singletonList(new InputSpec("NullVertex", 0,
-            SimpleInputLegacy.class.getName())),
-        Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            OldLocalOnFileSorterOutput.class.getName())));
+    // TODO NEWTEZ FIXME TezUmbilical handling
+    LogicalIOProcessorRuntimeTask t = MapUtils.runMapProcessor(localFs, workDir, job, 0,
+        new Path(workDir, "map0"), (TezUmbilical) null, vertexName,
+        Collections.singletonList(mapInputSpec),
+        Collections.singletonList(mapOutputSpec));
 
-    MRTask mrTask = (MRTask)t.getProcessor();
-    Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
-        .getCommitter().getClass().getName());
-    t.close();
+    // TODO NEWTEZ FIXME OutputCommitter verification
+//    MRTask mrTask = (MRTask)t.getProcessor();
+//    Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
+//        .getCommitter().getClass().getName());
+//    t.close();
 
-    Path mapOutputFile = mapOutputs.getInputFile(0);
+    Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
     LOG.info("mapOutputFile = " + mapOutputFile);
     IFile.Reader reader =
         new IFile.Reader(job, localFs, mapOutputFile, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 8bcd353..1d35f9b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -32,18 +32,19 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
-import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
-import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -51,7 +52,6 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.SimpleInputLegacy;
 import org.apache.tez.mapreduce.output.SimpleOutput;
-import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.junit.After;
@@ -104,8 +104,7 @@ public class TestReduceProcessor {
     String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
     JobConf jobConf = new JobConf(defaultConf);
     setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-    mapOutputs.setConf(jobConf);
+    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
     
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
@@ -117,14 +116,16 @@ public class TestReduceProcessor {
     mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
     
-    
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(
+        SimpleInputLegacy.class.getName()), 0);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(
+        LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
+    // TODO NEWTEZ FIX Umbilical creation
     MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
-        new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
-        Collections.singletonList(new InputSpec("NullVertex", 0,
-            SimpleInputLegacy.class.getName())),
-        Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            OldLocalOnFileSorterOutput.class.getName())));
+        new Path(workDir, "map0"), (TezUmbilical) null, mapVertexName,
+        Collections.singletonList(mapInputSpec),
+        Collections.singletonList(mapOutputSpec));
 
     LOG.info("Starting reduce...");
     
@@ -138,28 +139,43 @@ public class TestReduceProcessor {
     FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
     ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
         ReduceProcessor.class.getName());
+    
+    InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
+    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+    
     // Now run a reduce
-    TezEngineTaskContext taskContext = new TezEngineTaskContext(
-        TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser",
-        "testJob", reduceVertexName, reduceProcessorDesc,
-        Collections.singletonList(new InputSpec(mapVertexName, 1,
-            LocalMergedInput.class.getName())),
-        Collections.singletonList(new OutputSpec("", 1,
-                SimpleOutput.class.getName())));
+    TaskSpec taskSpec = new TaskSpec(
+        TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0),
+        "testUser",
+        reduceVertexName,
+        reduceProcessorDesc,
+        Collections.singletonList(reduceInputSpec),
+        Collections.singletonList(reduceOutputSpec));
+    
+    // TODO NEWTEZ FIXME Umbilical and jobToken
+    LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+        taskSpec,
+        1,
+        reduceConf,
+        (TezUmbilical) null,
+        null);
     
-    Task t = RuntimeUtils.createRuntimeTask(taskContext);
-    t.initialize(reduceConf, null, new TestUmbilicalProtocol());
-    t.run();
-    MRTask mrTask = (MRTask)t.getProcessor();
+    task.initialize();
+    task.run();
+    
+//    MRTask mrTask = (MRTask)t.getProcessor();
 //    TODO NEWTEZ Verify the partitioner has been created
 //    Assert.assertNull(mrTask.getPartitioner());
-    t.close();
+    task.close();
     
     // Can this be done via some utility class ? MapOutputFile derivative, or
     // instantiating the OutputCommitter
+    
+
+    // TODO NEWTEZ FIXME uniqueId generation and event generation (mockTaskId will not work here)
     Path reduceOutputDir = new Path(new Path(workDir, "output"),
         "_temporary/0/" + IDConverter
-            .toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
+            .toMRTaskId(TezTestUtils.getMockTaskId(0, 1, 0)));
     Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
 
     SequenceFile.Reader reader = new SequenceFile.Reader(localFs,