You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:18 UTC
[11/50] [abbrv] Rename *.new* packages back to what they should be,
remove dead code from the old packages - mapreduce module -
tez-engine module (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index e5cc902..404dd8c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -28,8 +28,8 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.records.ProceedToCompletionResponse;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 2bc327c..85e6653 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -44,14 +44,13 @@ import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.Task;
-import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
@@ -174,9 +173,9 @@ public class MapUtils {
outMeta.close();
}
- public static Task runMapProcessor(FileSystem fs, Path workDir,
+ public static LogicalIOProcessorRuntimeTask runMapProcessor(FileSystem fs, Path workDir,
JobConf jobConf, int mapId, Path mapInput,
- TezTaskUmbilicalProtocol umbilical,
+ TezUmbilical umbilical,
String vertexName, List<InputSpec> inputSpecs,
List<OutputSpec> outputSpecs) throws Exception {
jobConf.setInputFormat(SequenceFileInputFormat.class);
@@ -185,14 +184,24 @@ public class MapUtils {
ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor(
MapProcessor.class.getName());
writeSplitFiles(fs, jobConf, split);
- TezEngineTaskContext taskContext = new TezEngineTaskContext(
- TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser",
- "testJob", vertexName, mapProcessorDesc,
- inputSpecs, outputSpecs);
-
- Task t = RuntimeUtils.createRuntimeTask(taskContext);
- t.initialize(jobConf, null, umbilical);
- t.getProcessor().process(t.getInputs(), t.getOutputs());
- return t;
+
+ TaskSpec taskSpec = new TaskSpec(
+ TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0),
+ "testuser",
+ vertexName,
+ mapProcessorDesc,
+ inputSpecs,
+ outputSpecs);
+
+ // TODO NEWTEZ Fix umbilical access
+ LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ taskSpec,
+ 1,
+ jobConf,
+ umbilical,
+ null);
+ task.initialize();
+ task.run();
+ return task;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 84f1f81..2ecce8b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -31,24 +31,24 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.Constants;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Task;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
-import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -75,7 +75,7 @@ public class TestMapProcessor {
- TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+
public void setUpJobConf(JobConf job) {
job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
@@ -97,8 +97,7 @@ public class TestMapProcessor {
String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);
- TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
- mapOutputs.setConf(jobConf);
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
@@ -111,20 +110,22 @@ public class TestMapProcessor {
job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 1);
+ OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
- Task t = MapUtils.runMapProcessor(localFs, workDir, job, 0,
- new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,
- Collections.singletonList(new InputSpec("NullVertex", 0,
- SimpleInputLegacy.class.getName())),
- Collections.singletonList(new OutputSpec("FakeVertex", 1,
- OldLocalOnFileSorterOutput.class.getName())));
+ // TODO NEWTEZ FIXME TezUmbilical handling
+ LogicalIOProcessorRuntimeTask t = MapUtils.runMapProcessor(localFs, workDir, job, 0,
+ new Path(workDir, "map0"), (TezUmbilical) null, vertexName,
+ Collections.singletonList(mapInputSpec),
+ Collections.singletonList(mapOutputSpec));
- MRTask mrTask = (MRTask)t.getProcessor();
- Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
- .getCommitter().getClass().getName());
- t.close();
+ // TODO NEWTEZ FIXME OutputCommitter verification
+// MRTask mrTask = (MRTask)t.getProcessor();
+// Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
+// .getCommitter().getClass().getName());
+// t.close();
- Path mapOutputFile = mapOutputs.getInputFile(0);
+ Path mapOutputFile = mapOutputs.getInputFile(new InputAttemptIdentifier(0, 0));
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(job, localFs, mapOutputFile, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 8bcd353..1d35f9b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -32,18 +32,19 @@ import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.tez.common.Constants;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezUmbilical;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
-import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
-import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -51,7 +52,6 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.SimpleInputLegacy;
import org.apache.tez.mapreduce.output.SimpleOutput;
-import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.junit.After;
@@ -104,8 +104,7 @@ public class TestReduceProcessor {
String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);
- TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
- mapOutputs.setConf(jobConf);
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles(jobConf, "TODONEWTEZ_uniqueId");
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
@@ -117,14 +116,16 @@ public class TestReduceProcessor {
mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
-
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(
+ SimpleInputLegacy.class.getName()), 0);
+ OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(
+ LocalOnFileSorterOutput.class.getName()), 1);
// Run a map
+ // TODO NEWTEZ FIX Umbilical creation
MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
- new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
- Collections.singletonList(new InputSpec("NullVertex", 0,
- SimpleInputLegacy.class.getName())),
- Collections.singletonList(new OutputSpec("FakeVertex", 1,
- OldLocalOnFileSorterOutput.class.getName())));
+ new Path(workDir, "map0"), (TezUmbilical) null, mapVertexName,
+ Collections.singletonList(mapInputSpec),
+ Collections.singletonList(mapOutputSpec));
LOG.info("Starting reduce...");
@@ -138,28 +139,43 @@ public class TestReduceProcessor {
FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
ReduceProcessor.class.getName());
+
+ InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
+ OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+
// Now run a reduce
- TezEngineTaskContext taskContext = new TezEngineTaskContext(
- TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser",
- "testJob", reduceVertexName, reduceProcessorDesc,
- Collections.singletonList(new InputSpec(mapVertexName, 1,
- LocalMergedInput.class.getName())),
- Collections.singletonList(new OutputSpec("", 1,
- SimpleOutput.class.getName())));
+ TaskSpec taskSpec = new TaskSpec(
+ TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0),
+ "testUser",
+ reduceVertexName,
+ reduceProcessorDesc,
+ Collections.singletonList(reduceInputSpec),
+ Collections.singletonList(reduceOutputSpec));
+
+ // TODO NEWTEZ FIXME Umbilical and jobToken
+ LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ taskSpec,
+ 1,
+ reduceConf,
+ (TezUmbilical) null,
+ null);
- Task t = RuntimeUtils.createRuntimeTask(taskContext);
- t.initialize(reduceConf, null, new TestUmbilicalProtocol());
- t.run();
- MRTask mrTask = (MRTask)t.getProcessor();
+ task.initialize();
+ task.run();
+
+// MRTask mrTask = (MRTask)t.getProcessor();
// TODO NEWTEZ Verify the partitioner has been created
// Assert.assertNull(mrTask.getPartitioner());
- t.close();
+ task.close();
// Can this be done via some utility class ? MapOutputFile derivative, or
// instantiating the OutputCommitter
+
+
+ // TODO NEWTEZ FIXME uniqueId generation and event generation (mockTaskId will not work here)
Path reduceOutputDir = new Path(new Path(workDir, "output"),
"_temporary/0/" + IDConverter
- .toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
+ .toMRTaskId(TezTestUtils.getMockTaskId(0, 1, 0)));
Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(localFs,