You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/17 09:03:57 UTC
git commit: TEZ-62. Fix and enable tests in TestMapProcessor and
TestReduceProcessor. (sseth)
Updated Branches:
refs/heads/TEZ-1 38b410e31 -> cee79f638
TEZ-62. Fix and enable tests in TestMapProcessor and
TestReduceProcessor. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cee79f63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cee79f63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cee79f63
Branch: refs/heads/TEZ-1
Commit: cee79f6384940ca226cd0c2ef02e024f19e7550a
Parents: 38b410e
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 17 00:03:20 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 17 00:03:20 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/TezJobConfig.java | 7 +
.../org/apache/hadoop/mapred/YarnTezDagChild.java | 10 +-
.../apache/tez/common/TezEngineTaskContext.java | 4 +-
.../mapreduce/split/SplitMetaInfoReaderTez.java | 14 ++-
.../tez/mapreduce/hadoop/TezTypeConverters.java | 2 +
.../org/apache/tez/mapreduce/processor/MRTask.java | 8 +-
.../tez/mapreduce/processor/map/MapProcessor.java | 14 ++-
.../tez/mapreduce/TestUmbilicalProtocol.java | 2 +-
.../apache/tez/mapreduce/processor/MapUtils.java | 85 ++++++++----
.../mapreduce/processor/map/TestMapProcessor.java | 79 +++++++++---
.../processor/reduce/TestReduceProcessor.java | 104 +++++++++++----
11 files changed, 236 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index e867458..4a7abea 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -68,6 +68,13 @@ public class TezJobConfig {
public static final String DEFAULT_LOCAL_DIR = "/tmp";
/**
+ * The directory which contains the localized files for this task.
+ */
+ @Private
+ public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
+ public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
+
+ /**
*
*/
public static final String TEZ_ENGINE_TASK_INDEGREE =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f140c3a..2295b6a 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -249,12 +249,16 @@ public class YarnTezDagChild {
* out an output directory.
* @throws IOException
*/
- private static void configureLocalDirs(MRTask task, JobConf job) throws IOException {
+ private static void configureLocalDirs(JobConf job) throws IOException {
String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(Environment.LOCAL_DIRS.name()));
job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
+ job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+ System.getenv(Environment.PWD.name()));
LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
job.get(TezJobConfig.LOCAL_DIR));
+ LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
+ + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
Path workDir = null;
// First, try to find the JOB_LOCAL_DIR on this host.
@@ -285,7 +289,7 @@ public class YarnTezDagChild {
}
}
// TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc.
- job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+ job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
}
private static JobConf configureTask(MRTask task, Credentials credentials,
@@ -308,7 +312,7 @@ public class YarnTezDagChild {
// JobTokenSecretManager.createSecretKey(jt.getPassword()));
// setup the child's MRConfig.LOCAL_DIR.
- configureLocalDirs(task, job);
+ configureLocalDirs(job);
// setup the child's attempt directories
// Do the task-type specific localization
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
index 350702a..5823de6 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
@@ -39,7 +39,7 @@ public class TezEngineTaskContext extends TezTaskContext {
}
public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
- String jobName, String vertexName, String moduleClassName,
+ String jobName, String vertexName, String processorName,
List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
super(taskAttemptID, user, jobName, vertexName);
this.inputSpecList = inputSpecList;
@@ -52,7 +52,7 @@ public class TezEngineTaskContext extends TezTaskContext {
}
this.inputSpecList = inputSpecList;
this.outputSpecList = outputSpecList;
- this.processorName = moduleClassName;
+ this.processorName = processorName;
}
public String getProcessorName() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index de8d972..72246c0 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.tez.common.TezJobConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
/**
@@ -44,6 +45,9 @@ public class SplitMetaInfoReaderTez {
public static final Log LOG = LogFactory.getLog(SplitMetaInfoReaderTez.class);
+ public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
+ public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
+
// Forked from the MR variant so that the metaInfo file as well as the split
// file can be read from local fs - relying on these files being localized.
public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
@@ -53,9 +57,11 @@ public class SplitMetaInfoReaderTez {
MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
- Path metaSplitFile = new Path(MRJobConfig.JOB_SPLIT_METAINFO);
+ Path metaSplitFile = new Path(
+ conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+ MRJobConfig.JOB_SPLIT_METAINFO);
String jobSplitFile = MRJobConfig.JOB_SPLIT;
-
+
File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: "
+ file.getAbsolutePath() + ", defaultFS from conf: "
@@ -83,12 +89,12 @@ public class SplitMetaInfoReaderTez {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
- jobSplitFile, splitMetaInfo.getStartOffset());
+ new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), jobSplitFile)
+ .toUri().toString(), splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
}
in.close();
return allSplitMetaInfo;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
index 1fc71b0..768d347 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
@@ -34,6 +34,8 @@ import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
public class TezTypeConverters {
+ // TODO Remove unused methods
+
// Tez objects will be imported. Others will use the fully qualified name when
// required.
// All public methods named toYarn / toTez / toMapReduce
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index d7d721f..276294c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -149,7 +149,7 @@ extends RunningTaskContext {
jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
taskAttemptId.toString());
-
+
initResourceCalculatorPlugin();
LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
@@ -695,8 +695,10 @@ extends RunningTaskContext {
public void localizeConfiguration(JobConf jobConf)
throws IOException, InterruptedException {
- jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
- jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ jobConf.set(JobContext.TASK_ID, IDConverter
+ .toMRTaskAttemptId(taskAttemptId).toString());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID,
+ IDConverter.toMRTaskAttemptId(taskAttemptId).toString());
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index a6ab986..3558739 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -69,11 +69,7 @@ public class MapProcessor extends MRTask implements Processor {
public void initialize(Configuration conf, Master master) throws IOException,
InterruptedException {
super.initialize(conf, master);
- TaskSplitMetaInfo[] allMetaInfo = readSplits();
- TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
- .getTaskAttemptId().getTaskID().getId()];
- splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
- thisTaskMetaInfo.getStartOffset());
+
}
@Override
@@ -81,6 +77,14 @@ public class MapProcessor extends MRTask implements Processor {
final Input[] ins,
final Output[] outs)
throws IOException, InterruptedException {
+
+ // Read split information.
+ TaskSplitMetaInfo[] allMetaInfo = readSplits();
+ TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
+ .getTaskAttemptId().getTaskID().getId()];
+ splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+ thisTaskMetaInfo.getStartOffset());
+
MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
boolean useNewApi = jobConf.getUseNewMapper();
initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index 025bf61..840cb31 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -111,7 +111,7 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
@Override
public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
LOG.info("Got 'can-commit' from " + taskid);
- return false;
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index aec6667..0b6bc5f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -18,40 +18,37 @@
package org.apache.tez.mapreduce.processor;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
import java.io.IOException;
-import java.util.Collections;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.tez.common.InputSpec;
import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.api.Task;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.engine.task.RuntimeTask;
import org.apache.tez.mapreduce.TezTestUtils;
-import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
public class MapUtils {
@@ -62,8 +59,7 @@ public class MapUtils {
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file)
throws IOException {
FileInputFormat.setInputPaths(job, workDir);
-
-
+
// create a file with length entries
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, job, file,
@@ -92,30 +88,59 @@ public class MapUtils {
"file = " + ((FileSplit)splits[0]).getPath());
return splits[0];
}
+
+ final private static FsPermission JOB_FILE_PERMISSION = FsPermission
+ .createImmutable((short) 0644); // rw-r--r--
+
+ // Will write files to PWD, from where they are read.
+
+ private static void writeSplitFiles(FileSystem fs, JobConf conf,
+ InputSplit split) throws IOException {
+ Path jobSplitFile = new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+ TezJobConfig.DEFAULT_TASK_LOCAL_RESOURCE_DIR), MRJobConfig.JOB_SPLIT);
+ FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+
+ long offset = out.getPos();
+ Text.writeString(out, split.getClass().getName());
+ split.write(out);
+ out.close();
+
+ String[] locations = split.getLocations();
+
+ SplitMetaInfo info = null;
+ info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
+
+ Path jobSplitMetaInfoFile = new Path(
+ conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+ MRJobConfig.JOB_SPLIT_METAINFO);
+
+ FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
+ new FsPermission(JOB_FILE_PERMISSION));
+ outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
+ WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
+ WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
+ info.write(outMeta);
+ outMeta.close();
+ }
public static Task runMapProcessor(FileSystem fs, Path workDir,
- JobConf jobConf,
- int mapId, Path mapInput,
+ JobConf jobConf, int mapId, Path mapInput,
TezTaskUmbilicalProtocol umbilical,
- Class<?> outputClazz) throws Exception {
+ String vertexName, List<InputSpec> inputSpecs,
+ List<OutputSpec> outputSpecs) throws Exception {
jobConf.setInputFormat(SequenceFileInputFormat.class);
InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
- TezEngineTaskContext taskContext =
- new TezEngineTaskContext(
- TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "tez",
- "tez", "TODO_vertexName", MapProcessor.class.getName(),
- Collections.singletonList(new InputSpec("srcVertex", 0,
- SimpleInput.class.getName())),
- Collections.singletonList(new OutputSpec("targetVertex", 0,
- outputClazz.getName())));
+
+ writeSplitFiles(fs, jobConf, split);
+ TezEngineTaskContext taskContext = new TezEngineTaskContext(
+ TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser",
+ "testJob", vertexName, MapProcessor.class.getName(),
+ inputSpecs, outputSpecs);
Task t = RuntimeUtils.createRuntimeTask(taskContext);
t.initialize(jobConf, umbilical);
- SimpleInput[] real = ((SimpleInput[])t.getInputs());
- SimpleInput[] inputs = spy(real);
- doReturn(split).when(inputs[0]).getOldSplitDetails(any(TaskSplitIndex.class));
- t.getProcessor().process(inputs, t.getOutputs());
+ t.getProcessor().process(t.getInputs(), t.getOutputs());
return t;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index f09340c..cda15fb 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -19,9 +19,11 @@ package org.apache.tez.mapreduce.processor.map;
import java.io.IOException;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -29,6 +31,8 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.Constants;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.sort.impl.IFile;
@@ -37,11 +41,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.lib.output.InMemorySortedOutput;
import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.TruncatedChannelBuffer;
import org.jboss.netty.handler.stream.ChunkedStream;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -49,9 +57,7 @@ import org.junit.Test;
public class TestMapProcessor {
- private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
-
- JobConf job;
+ private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null;
@@ -63,32 +69,51 @@ public class TestMapProcessor {
throw new RuntimeException("init failure", e);
}
}
+ @SuppressWarnings("deprecation")
private static Path workDir =
new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestMapProcessor").makeQualified(localFs);
TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-
- @Before
- public void setUp() {
- job = new JobConf(defaultConf);
+
+ public void setUpJobConf(JobConf job) {
job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
job.setClass(
Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class,
TezTaskOutput.class);
job.setNumReduceTasks(1);
- mapOutputs.setConf(job);
+ }
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ localFs.delete(workDir, true);
}
@Test
- @Ignore
public void testMapProcessor() throws Exception {
- localFs.delete(workDir, true);
- MapUtils.runMapProcessor(
- localFs, workDir, job, 0, new Path(workDir, "map0"),
- new TestUmbilicalProtocol(),
- LocalOnFileSorterOutput.class).close();
+ String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ JobConf jobConf = new JobConf(defaultConf);
+ setUpJobConf(jobConf);
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+ mapOutputs.setConf(jobConf);
+
+ Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+ Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+ vertexName);
+
+ JobConf job = new JobConf(stageConf);
+
+ job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+ "localized-resources").toUri().toString());
+
+ MapUtils.runMapProcessor(localFs, workDir, job, 0,
+ new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,
+ Collections.singletonList(new InputSpec("NullVertex", 0,
+ SimpleInput.class.getName())),
+ Collections.singletonList(new OutputSpec("FakeVertex", 1,
+ LocalOnFileSorterOutput.class.getName()))).close();
Path mapOutputFile = mapOutputs.getInputFile(0);
LOG.info("mapOutputFile = " + mapOutputFile);
@@ -115,16 +140,34 @@ public class TestMapProcessor {
@Test
@Ignore
public void testMapProcessorWithInMemSort() throws Exception {
+
+ String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+
final int partitions = 2;
- job.setNumReduceTasks(partitions);
- job.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, partitions);
+ JobConf jobConf = new JobConf(defaultConf);
+ jobConf.setNumReduceTasks(partitions);
+ setUpJobConf(jobConf);
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+ mapOutputs.setConf(jobConf);
+
+ Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+ Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+ vertexName);
+
+ JobConf job = new JobConf(stageConf);
+ job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+ "localized-resources").toUri().toString());
localFs.delete(workDir, true);
Task t =
MapUtils.runMapProcessor(
localFs, workDir, job, 0, new Path(workDir, "map0"),
- new TestUmbilicalProtocol(true),
- InMemorySortedOutput.class);
+ new TestUmbilicalProtocol(true), vertexName,
+ Collections.singletonList(new InputSpec("NullVertex", 0,
+ SimpleInput.class.getName())),
+ Collections.singletonList(new OutputSpec("FakeVertex", 1,
+ InMemorySortedOutput.class.getName()))
+ );
InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
verifyInMemSortedStream(outputs[0], 0, 4096);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cee79f63/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index afc21a7..69571e1 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -22,40 +22,44 @@ import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.tez.common.Constants;
import org.apache.tez.common.InputSpec;
import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.lib.input.LocalMergedInput;
import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.engine.task.RuntimeTask;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
import org.apache.tez.mapreduce.processor.MapUtils;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class TestReduceProcessor {
private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
-
- JobConf job;
-
+
private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null;
static {
@@ -66,13 +70,12 @@ public class TestReduceProcessor {
throw new RuntimeException("init failure", e);
}
}
+ @SuppressWarnings("deprecation")
private static Path workDir =
new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestReduceProcessor").makeQualified(localFs);
- @Before
- public void setUp() {
- job = new JobConf(defaultConf);
+ public void setUpJobConf(JobConf job) {
job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
job.setClass(
Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
@@ -80,37 +83,84 @@ public class TestReduceProcessor {
TezTaskOutput.class);
job.setNumReduceTasks(1);
}
-
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ localFs.delete(workDir, true);
+ }
+
@Test
- @Ignore
public void testReduceProcessor() throws Exception {
- localFs.delete(workDir, true);
+ String mapVertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+ JobConf jobConf = new JobConf(defaultConf);
+ setUpJobConf(jobConf);
+ TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+ mapOutputs.setConf(jobConf);
+
+ Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+ Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+ mapVertexName);
+
+ JobConf mapConf = new JobConf(mapStageConf);
+
+ mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+ "localized-resources").toUri().toString());
+
// Run a map
- MapUtils.runMapProcessor(
- localFs, workDir, job, 0, new Path(workDir, "map0"),
- new TestUmbilicalProtocol(),
- LocalOnFileSorterOutput.class);
+ MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
+ new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
+ Collections.singletonList(new InputSpec("NullVertex", 0,
+ SimpleInput.class.getName())),
+ Collections.singletonList(new OutputSpec("FakeVertex", 1,
+ LocalOnFileSorterOutput.class.getName())));
LOG.info("Starting reduce...");
- FileOutputFormat.setOutputPath(job, new Path(workDir, "output"));
+
+
+ Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+ reduceVertexName);
+ JobConf reduceConf = new JobConf(reduceStageConf);
+ reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
// Now run a reduce
TezEngineTaskContext taskContext = new TezEngineTaskContext(
- TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), "tez",
- "tez", "TODO_vertexName", ReduceProcessor.class.getName(),
- Collections.singletonList(new InputSpec("TODO_srcVertexName", 1,
+ TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser",
+ "testJob", reduceVertexName, ReduceProcessor.class.getName(),
+ Collections.singletonList(new InputSpec(mapVertexName, 1,
LocalMergedInput.class.getName())),
- Collections.singletonList(new OutputSpec("TODO_targetVertexName", 1,
+ Collections.singletonList(new OutputSpec("", 1,
SimpleOutput.class.getName())));
-
- job.set(JobContext.TASK_ATTEMPT_ID,
- taskContext.getTaskAttemptId().toString());
+
Task t = RuntimeUtils.createRuntimeTask(taskContext);
- t.initialize(job, new TestUmbilicalProtocol());
+ t.initialize(reduceConf, new TestUmbilicalProtocol());
t.run();
t.close();
+
+ // Can this be done via some utility class ? MapOutputFile derivative, or
+ // instantiating the OutputCommitter
+ Path reduceOutputDir = new Path(new Path(workDir, "output"),
+ "_temporary/0/" + IDConverter
+ .toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
+ Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
+
+ @SuppressWarnings("deprecation")
+ SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf);
+
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ long prev = Long.MIN_VALUE;
+ while (reader.next(key, value)) {
+ if (prev != Long.MIN_VALUE) {
+ Assert.assertTrue(prev < key.get());
+ prev = key.get();
+ }
+ }
+ reader.close();
}
}