You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/19 05:51:48 UTC
git commit: TEZ-134. Untangle MR configuration in YarnTezDagChild.
(sseth)
Updated Branches:
refs/heads/TEZ-1 0d5027d00 -> be6d4bc09
TEZ-134. Untangle MR configuration in YarnTezDagChild. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/be6d4bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/be6d4bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/be6d4bc0
Branch: refs/heads/TEZ-1
Commit: be6d4bc096d9bae363cbcf1a3dc5ec67b057507d
Parents: 0d5027d
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat May 18 20:51:06 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat May 18 20:51:06 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/TezJobConfig.java | 19 +-
.../org/apache/hadoop/mapred/YarnTezDagChild.java | 291 ++++-----------
.../main/java/org/apache/tez/engine/api/Task.java | 2 +
.../apache/tez/common/TezEngineTaskContext.java | 5 +
.../tez/engine/common/shuffle/impl/Shuffle.java | 2 +-
.../tez/engine/common/sort/impl/TezMerger.java | 2 +-
.../task/local/output/TezLocalTaskOutputFiles.java | 4 +-
.../task/local/output/TezTaskOutputFiles.java | 2 +-
.../apache/tez/engine/runtime/RuntimeUtils.java | 22 +-
.../org/apache/tez/engine/task/RuntimeTask.java | 21 +-
.../org/apache/hadoop/mapred/LocalJobRunner.java | 8 +-
.../apache/tez/mapreduce/task/MRRuntimeTask.java | 262 +++++++++++++
.../apache/tez/mapreduce/processor/MapUtils.java | 50 +++
.../mapreduce/processor/map/TestMapProcessor.java | 23 +-
.../processor/reduce/TestReduceProcessor.java | 23 +-
15 files changed, 477 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 4a7abea..0d6f6be 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -62,11 +62,16 @@ public class TezJobConfig {
public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000;
/**
- *
+ * List of directories avialble to the engine.
*/
- public static final String LOCAL_DIR = "tez.engine.local.dir";
- public static final String DEFAULT_LOCAL_DIR = "/tmp";
-
+ public static final String LOCAL_DIRS = "tez.engine.local.dirs";
+ public static final String DEFAULT_LOCAL_DIRS = "/tmp";
+
+ /**
+ * One local dir for the speicfic job.
+ */
+ public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir";
+
/**
* The directory which contains the localized files for this task.
*/
@@ -74,6 +79,8 @@ public class TezJobConfig {
public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
+ public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir";
+
/**
*
*/
@@ -299,4 +306,8 @@ public class TezJobConfig {
* credentials.
*/
public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary";
+
+
+ public static final String APPLICATION_ATTEMPT_ID =
+ "tez.job.application.attempt.id";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 2295b6a..6fd6eff 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -20,17 +20,9 @@ package org.apache.hadoop.mapred;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.net.URI;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,16 +31,8 @@ import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
@@ -72,16 +56,15 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.runtime.RuntimeUtils;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.engine.task.RuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
-import org.apache.tez.mapreduce.processor.MRTask;
/**
- * The main() for TEZ MapReduce task processes.
+ * The main() for TEZ Task processes.
*/
public class YarnTezDagChild {
@@ -91,11 +74,9 @@ public class YarnTezDagChild {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
LOG.debug("Child starting");
- DeprecatedKeys.init();
-
- final JobConf defaultConf = new JobConf();
- // HACK Eventually load the DagConf for security etc setup.
-// defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+ final Configuration defaultConf = new Configuration();
+ // Security settings will be loaded based on core-site and core-default. Don't
+ // depend on the jobConf for this.
UserGroupInformation.setConfiguration(defaultConf);
String host = args[0];
@@ -121,9 +102,10 @@ public class YarnTezDagChild {
// Create TaskUmbilicalProtocol as actual task owner.
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(appID.toString());
- Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
- SecurityUtil.setTokenService(jt, address);
- taskOwner.addToken(jt);
+
+ Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+ SecurityUtil.setTokenService(jobToken, address);
+ taskOwner.addToken(jobToken);
final TezTaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
@Override
@@ -142,9 +124,8 @@ public class YarnTezDagChild {
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
TezTaskAttemptID taskAttemptId = null;
- MRTask task = null;
ContainerContext containerContext = new ContainerContext(containerId, pid);
-
+
try {
while (true) {
// poll for new task
@@ -165,19 +146,19 @@ public class YarnTezDagChild {
}
taskContext = (TezEngineTaskContext) containerTask
.getTezEngineTaskContext();
- LOG.info("XXXX: New container task context:"
+ LOG.info("DEBUG: New container task context:"
+ taskContext.toString());
taskAttemptId = taskContext.getTaskAttemptId();
final Task t = createAndConfigureTezTask(taskContext, umbilical,
- credentials, jt,
+ credentials, jobToken,
containerId.getApplicationAttemptId().getAttemptId());
- task = (MRTask) t.getProcessor();
- final JobConf job = task.getConf();
- // Initiate Java VM metrics
- JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
+ final Configuration conf = ((RuntimeTask)t).getConfiguration();
+
+ // TODO Initiate Java VM metrics
+ // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
childUGI = UserGroupInformation.createRemoteUser(System
.getenv(ApplicationConstants.Environment.USER.toString()));
// Add tokens to new user so that it may execute its task correctly.
@@ -186,7 +167,7 @@ public class YarnTezDagChild {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- runTezTask(t, umbilical, job); // run the task
+ runTezTask(t, umbilical, conf); // run the task
return null;
}
});
@@ -196,34 +177,6 @@ public class YarnTezDagChild {
} catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskAttemptId, e.getMessage());
- } catch (Exception exception) {
- LOG.warn("Exception running child : "
- + StringUtils.stringifyException(exception));
- try {
- if (task != null) {
- // do cleanup for the task
- if (childUGI == null) { // no need to job into doAs block
- task.taskCleanup(umbilical);
- } else {
- final MRTask taskFinal = task;
- childUGI.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- taskFinal.taskCleanup(umbilical);
- return null;
- }
- });
- }
- }
- } catch (Exception e) {
- LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
- }
- // Report back any failures, for diagnostic purposes
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- exception.printStackTrace(new PrintStream(baos));
- if (taskAttemptId != null) {
- umbilical.fatalError(taskAttemptId, baos.toString());
- }
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable));
@@ -249,29 +202,36 @@ public class YarnTezDagChild {
* out an output directory.
* @throws IOException
*/
- private static void configureLocalDirs(JobConf job) throws IOException {
+ /**
+ * Configure tez-local-dirs, tez-localized-file-dir, etc. Also create these
+ * dirs.
+ */
+
+ private static void configureLocalDirs(Configuration conf) throws IOException {
String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(Environment.LOCAL_DIRS.name()));
- job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
- job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
+ conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
System.getenv(Environment.PWD.name()));
- LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
- job.get(TezJobConfig.LOCAL_DIR));
+
+ LOG.info(TezJobConfig.LOCAL_DIRS + " for child: " +
+ conf.get(TezJobConfig.LOCAL_DIRS));
LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
- + job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
- LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
+
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
Path workDir = null;
// First, try to find the JOB_LOCAL_DIR on this host.
try {
- workDir = lDirAlloc.getLocalPathToRead("work", job);
+ workDir = lDirAlloc.getLocalPathToRead("work", conf);
} catch (DiskErrorException e) {
// DiskErrorException means dir not found. If not found, it will
// be created below.
}
if (workDir == null) {
// JOB_LOCAL_DIR doesn't exist on this host -- Create it.
- workDir = lDirAlloc.getLocalPathForWrite("work", job);
- FileSystem lfs = FileSystem.getLocal(job).getRaw();
+ workDir = lDirAlloc.getLocalPathForWrite("work", conf);
+ FileSystem lfs = FileSystem.getLocal(conf).getRaw();
boolean madeDir = false;
try {
madeDir = lfs.mkdirs(workDir);
@@ -281,155 +241,29 @@ public class YarnTezDagChild {
// at the same time. If this task loses the race, it's okay because
// the directory already exists.
madeDir = true;
- workDir = lDirAlloc.getLocalPathToRead("work", job);
+ workDir = lDirAlloc.getLocalPathToRead("work", conf);
}
if (!madeDir) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
}
- // TODO TEZ This likely needs fixing to make sure things work when there are multiple local-dirs etc.
- job.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
- }
-
- private static JobConf configureTask(MRTask task, Credentials credentials,
- Token<JobTokenIdentifier> jt, int appAttemptId)
- throws IOException, InterruptedException {
- JobConf job = task.getConf();
-
- // Set it in conf, so as to be able to be used the the OutputCommitter.
- job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
-
- // set tcp nodelay
- job.setBoolean("ipc.client.tcpnodelay", true);
- job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
- YarnOutputFiles.class, MapOutputFile.class);
- // set the jobTokenFile into task
- SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
-
- task.setJobTokenSecret(sk);
-// task.setJobTokenSecret(
-// JobTokenSecretManager.createSecretKey(jt.getPassword()));
-
- // setup the child's MRConfig.LOCAL_DIR.
- configureLocalDirs(job);
-
- // setup the child's attempt directories
- // Do the task-type specific localization
- task.localizeConfiguration(job);
-
- // Set up the DistributedCache related configs
- setupDistributedCacheConfig(job);
-
- // Overwrite the localized task jobconf which is linked to in the current
- // work-dir.
- Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
- writeLocalJobFile(localTaskFile, job);
- task.setConf(job);
- return job;
- }
-
- /**
- * Set up the DistributedCache related configs to make
- * {@link DistributedCache#getLocalCacheFiles(Configuration)}
- * and
- * {@link DistributedCache#getLocalCacheArchives(Configuration)}
- * working.
- * @param job
- * @throws IOException
- */
- private static void setupDistributedCacheConfig(final JobConf job)
- throws IOException {
-
- String localWorkDir = System.getenv("PWD");
- // ^ ^ all symlinks are created in the current work-dir
-
- // Update the configuration object with localized archives.
- URI[] cacheArchives = DistributedCache.getCacheArchives(job);
- if (cacheArchives != null) {
- List<String> localArchives = new ArrayList<String>();
- for (int i = 0; i < cacheArchives.length; ++i) {
- URI u = cacheArchives[i];
- Path p = new Path(u);
- Path name =
- new Path((null == u.getFragment()) ? p.getName()
- : u.getFragment());
- String linkName = name.toUri().getPath();
- localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
- }
- if (!localArchives.isEmpty()) {
- job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
- .arrayToString(localArchives.toArray(new String[localArchives
- .size()])));
- }
- }
-
- // Update the configuration object with localized files.
- URI[] cacheFiles = DistributedCache.getCacheFiles(job);
- if (cacheFiles != null) {
- List<String> localFiles = new ArrayList<String>();
- for (int i = 0; i < cacheFiles.length; ++i) {
- URI u = cacheFiles[i];
- Path p = new Path(u);
- Path name =
- new Path((null == u.getFragment()) ? p.getName()
- : u.getFragment());
- String linkName = name.toUri().getPath();
- localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
- }
- if (!localFiles.isEmpty()) {
- job.set(MRJobConfig.CACHE_LOCALFILES,
- StringUtils.arrayToString(localFiles
- .toArray(new String[localFiles.size()])));
- }
- }
- }
-
- private static final FsPermission urw_gr =
- FsPermission.createImmutable((short) 0640);
-
- /**
- * Write the task specific job-configuration file.
- * @throws IOException
- */
- private static void writeLocalJobFile(Path jobFile, JobConf conf)
- throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(jobFile);
- OutputStream out = null;
- try {
- out = FileSystem.create(localFs, jobFile, urw_gr);
- conf.writeXml(out);
- } finally {
- IOUtils.cleanup(LOG, out);
- }
+ conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
}
-
+
private static Task createAndConfigureTezTask(
- TezEngineTaskContext taskContext,
- TezTaskUmbilicalProtocol master,
- Credentials credentials, Token<JobTokenIdentifier> jt,
- int appAttemptId)
- throws IOException, InterruptedException {
+ TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
+ Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
+ int appAttemptId) throws IOException, InterruptedException {
- Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE);
- Configuration conf = MultiStageMRConfigUtil.getConfForVertex(jConf,
- taskContext.getVertexName());
+ Configuration conf = new Configuration();
+ // set tcp nodelay
+ conf.setBoolean("ipc.client.tcpnodelay", true);
+ conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+
+ configureLocalDirs(conf);
+
- // TOOD Post MRR
- // A single file per vertex will likely be a better solution. Does not
- // require translation - client can take care of this. Will work independent
- // of whether the configuration is for intermediate tasks or not. Has the
- // overhead of localizing multiple files per job - i.e. the client would
- // need to write these files to hdfs, add them as local resources per
- // vertex. A solution like this may be more practical once it's possible to
- // submit configuration parameters to the AM and effectively tasks via RPC.
-
- // TODO Avoid all this extra config manipulation.
- // FIXME we need I/O/p level configs to be used in init below
- final JobConf job = new JobConf(conf);
- job.setCredentials(credentials);
-
// FIXME need Input/Output vertices else we have this hack
if (taskContext.getInputSpecList().isEmpty()) {
taskContext.getInputSpecList().add(
@@ -440,26 +274,37 @@ public class YarnTezDagChild {
new OutputSpec("null", 0, SimpleOutput.class.getName()));
}
Task t = RuntimeUtils.createRuntimeTask(taskContext);
-
+ t.initialize(conf, master);
+
// FIXME wrapper should initialize all of processor, inputs and outputs
// Currently, processor is inited via task init
// and processor then inits inputs and outputs
- t.initialize(job, master);
-
- MRTask task = (MRTask)t.getProcessor();
- configureTask(task, credentials, jt, appAttemptId);
-
return t;
}
private static void runTezTask(
- Task t, TezTaskUmbilicalProtocol master, JobConf job)
+ Task t, TezTaskUmbilicalProtocol master, Configuration conf)
throws IOException, InterruptedException {
// use job-specified working directory
- FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+ FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
// Run!
t.run();
t.close();
}
+
+ private static Path getWorkingDirectory(Configuration conf) {
+ String name = conf.get(JobContext.WORKING_DIR);
+ if (name != null) {
+ return new Path(name);
+ } else {
+ try {
+ Path dir = FileSystem.get(conf).getWorkingDirectory();
+ conf.set(JobContext.WORKING_DIR, dir.toString());
+ return dir;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
index ea50a06..c08eda6 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
@@ -74,4 +74,6 @@ public interface Task {
*/
public void close() throws IOException, InterruptedException;
+
+ public Configuration getConfiguration();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
index 5823de6..93aeb0b 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
@@ -55,6 +55,11 @@ public class TezEngineTaskContext extends TezTaskContext {
this.processorName = processorName;
}
+ public String getRuntimeName() {
+ // FIXME. Add this to the DAG configuration, and fetch from there.
+ return "org.apache.tez.mapreduce.task.MRRuntimeTask";
+ }
+
public String getProcessorName() {
return processorName;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 6a065de..202abd6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -80,7 +80,7 @@ public class Shuffle implements ExceptionReporter {
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
copyPhase = this.runningTaskContext.getProgress().addPhase("copy", 0.33f);
mergePhase = this.runningTaskContext.getProgress().addPhase("merge", 0.66f);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
index 829495b..eeea764 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
@@ -57,7 +57,7 @@ public class TezMerger {
// Local directories
private static LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
index ccfbd78..69484af 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.records.TezTaskID;
public class TezLocalTaskOutputFiles extends TezTaskOutput {
private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
public TezLocalTaskOutputFiles() {
}
@@ -223,7 +223,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
}
private String[] getLocalDirs() throws IOException {
- return getConf().getStrings(TezJobConfig.LOCAL_DIR);
+ return getConf().getStrings(TezJobConfig.LOCAL_DIRS);
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index 1bd65e2..5fb6519 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -56,7 +56,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
// assume configured to $localdir/usercache/$user/appcache/$appId
private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
private Path getAttemptOutputDir() {
LOG.info("DEBUG: getAttemptOutputDir: "
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
index e04d405..900c2f0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
@@ -66,6 +66,7 @@ public class RuntimeUtils {
+ ", Processor: " + taskContext.getProcessorName()
+ ", InputCount=" + taskContext.getInputSpecList().size()
+ ", OutputCount=" + taskContext.getOutputSpecList().size());
+
RuntimeTask t = null;
try {
Class<?> processorClazz =
@@ -108,11 +109,30 @@ public class RuntimeUtils {
outputs[i] = output;
}
}
- t = new RuntimeTask(processor, inputs, outputs);
+ // t = new RuntimeTask(taskContext, processor, inputs, outputs);
+ t = createRuntime(taskContext, processor, inputs, outputs);
} catch (ClassNotFoundException e) {
throw new YarnException("Unable to initialize RuntimeTask, context="
+ taskContext, e);
}
return t;
}
+
+ private static RuntimeTask createRuntime(TezEngineTaskContext taskContext,
+ Processor processor, Input[] inputs, Output[] outputs) {
+ try {
+ // TODO Change this to use getNewInstance
+ Class<?> runtimeClazz = Class.forName(taskContext.getRuntimeName());
+ Constructor<?> ctor = runtimeClazz.getConstructor(
+ TezEngineTaskContext.class, Processor.class, Input[].class,
+ Output[].class);
+ ctor.setAccessible(true);
+ return (RuntimeTask) ctor.newInstance(taskContext, processor, inputs, outputs);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Unable to load runtimeClass: "
+ + taskContext.getRuntimeName(), e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
index 3e2c6f2..871f3ba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.task;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.engine.api.Input;
import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.api.Output;
@@ -28,17 +29,19 @@ import org.apache.tez.engine.api.Task;
public class RuntimeTask implements Task {
- private final Input[] inputs;
- private final Output[] outputs;
- private final Processor processor;
+ protected final Input[] inputs;
+ protected final Output[] outputs;
+ protected final Processor processor;
- private Configuration conf;
- private Master master;
+ protected TezEngineTaskContext taskContext;
+ protected Configuration conf;
+ protected Master master;
- public RuntimeTask(
+ public RuntimeTask(TezEngineTaskContext taskContext,
Processor processor,
Input[] inputs,
Output[] outputs) {
+ this.taskContext = taskContext;
this.inputs = inputs;
this.processor = processor;
this.outputs = outputs;
@@ -74,7 +77,13 @@ public class RuntimeTask implements Task {
public void close() throws IOException, InterruptedException {
// NOTE: Allow processor to close input/output
+ // This can be changed to close input/output since MRRuntimeTask is used for
+ // MR jobs, which changes the order.
processor.close();
}
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 63472d2..5c181e2 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -465,7 +465,7 @@ public class LocalJobRunner implements ClientProtocol {
LOG.info("XXX mapId: " + i +
" LOCAL_DIR = " +
mapOutputFiles.get(mapId).getConf().get(
- TezJobConfig.LOCAL_DIR));
+ TezJobConfig.LOCAL_DIRS));
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
localOutputFile.setConf(localConf);
@@ -902,7 +902,7 @@ public class LocalJobRunner implements ClientProtocol {
TaskAttemptID taskAttemptID, String user, JobConf conf) {
String[] localDirs =
conf.getTrimmedStrings(
- TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR);
+ TezJobConfig.LOCAL_DIRS, TezJobConfig.DEFAULT_LOCAL_DIRS);
String jobId = taskAttemptID.getJobID().toString();
String taskId = taskAttemptID.getTaskID().toString();
boolean isCleanup = false;
@@ -913,9 +913,9 @@ public class LocalJobRunner implements ClientProtocol {
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ getLocalTaskDir(user, jobId, taskId, isCleanup));
}
- LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID +
+ LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID +
" is " + childMapredLocalDir);
- conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString());
+ conf.set(TezJobConfig.LOCAL_DIRS, childMapredLocalDir.toString());
conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class, TezTaskOutput.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
new file mode 100644
index 0000000..6091d4b
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.task;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.task.impl.YarnOutputFiles;
+
+@SuppressWarnings("deprecation")
+public class MRRuntimeTask extends RuntimeTask {
+
+ private static final Log LOG = LogFactory.getLog(MRRuntimeTask.class);
+
+ private MRTask mrTask;
+
+ public MRRuntimeTask(TezEngineTaskContext taskContext, Processor processor,
+ Input[] inputs, Output[] outputs) {
+ super(taskContext, processor, inputs, outputs);
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+
+ DeprecatedKeys.init();
+
+ Configuration mrConf = new Configuration(conf);
+ mrConf.addResource(MRJobConfig.JOB_CONF_FILE);
+ Configuration taskConf = MultiStageMRConfigUtil.getConfForVertex(mrConf,
+ taskContext.getVertexName());
+
+ // TODO Avoid all this extra config manipulation.
+ // FIXME we need I/O/p level configs to be used in init below
+
+ // TOOD Post MRR
+ // A single file per vertex will likely be a better solution. Does not
+ // require translation - client can take care of this. Will work independent
+ // of whether the configuration is for intermediate tasks or not. Has the
+ // overhead of localizing multiple files per job - i.e. the client would
+ // need to write these files to hdfs, add them as local resources per
+ // vertex. A solution like this may be more practical once it's possible to
+ // submit configuration parameters to the AM and effectively tasks via RPC.
+
+ final JobConf job = new JobConf(taskConf);
+
+ MRTask mrTask = (MRTask) getProcessor();
+ this.mrTask = mrTask;
+ configureMRTask(job, mrTask);
+
+ this.conf = job;
+ this.master = master;
+
+ // NOTE: Allow processor to initialize input/output
+ processor.initialize(this.conf, this.master);
+ }
+
+ @Override
+ public void run() throws IOException, InterruptedException {
+ TezTaskUmbilicalProtocol umbilical = (TezTaskUmbilicalProtocol) master;
+ try {
+ super.run();
+ } catch (FSError e) {
+ throw e;
+ } catch (Exception exception) {
+ LOG.warn("Exception running child : "
+ + StringUtils.stringifyException(exception));
+ try {
+ if (mrTask != null) {
+ mrTask.taskCleanup(umbilical);
+ }
+ } catch (Exception e) {
+ LOG.info("Exception cleanup up: " + StringUtils.stringifyException(e));
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ exception.printStackTrace(new PrintStream(baos));
+ if (taskContext.getTaskAttemptId() != null) {
+ umbilical.fatalError(taskContext.getTaskAttemptId(), baos.toString());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ // NOTE: Allow processor to close input/output
+ processor.close();
+ }
+
+ private static void configureMRTask(JobConf job, MRTask task)
+ throws IOException, InterruptedException {
+
+ Credentials credentials = UserGroupInformation.getCurrentUser()
+ .getCredentials();
+ job.setCredentials(credentials);
+ // TODO Can this be avoided all together. Have the MRTezOutputCommitter use
+ // the Tez parameter.
+ // TODO This could be fetched from the env if YARN is setting it for all
+ // Containers.
+ // Set it in conf, so as to be able to be used the the OutputCommitter.
+ job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ job.getInt(TezJobConfig.APPLICATION_ATTEMPT_ID, -1));
+
+ job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
+ MapOutputFile.class); // MR
+
+ Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
+ if (jobToken != null) {
+ // Will MR ever run without a job token.
+ SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken
+ .getPassword());
+ task.setJobTokenSecret(sk);
+ } else {
+ LOG.warn("No job token set");
+ }
+
+ job.set(MRJobConfig.JOB_LOCAL_DIR, job.get(TezJobConfig.JOB_LOCAL_DIR));
+
+ // setup the child's attempt directories
+ // Do the task-type specific localization
+ task.localizeConfiguration(job);
+
+ // Set up the DistributedCache related configs
+ setupDistributedCacheConfig(job);
+
+ // Overwrite the localized task jobconf which is linked to in the current
+ // work-dir.
+ Path localTaskFile = new Path(
+ job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+ MRJobConfig.JOB_CONF_FILE);
+ writeLocalJobFile(localTaskFile, job);
+
+ task.setConf(job);
+ }
+
+ /**
+ * Set up the DistributedCache related configs to make
+ * {@link DistributedCache#getLocalCacheFiles(Configuration)} and
+ * {@link DistributedCache#getLocalCacheArchives(Configuration)} working.
+ *
+ * @param job
+ * @throws IOException
+ */
+ private static void setupDistributedCacheConfig(final JobConf job)
+ throws IOException {
+
+ String localWorkDir = System.getenv(System.getenv(Environment.PWD.name()));
+ // ^ ^ all symlinks are created in the current work-dir
+
+ // Update the configuration object with localized archives.
+ URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+ if (cacheArchives != null) {
+ List<String> localArchives = new ArrayList<String>();
+ for (int i = 0; i < cacheArchives.length; ++i) {
+ URI u = cacheArchives[i];
+ Path p = new Path(u);
+ Path name = new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localArchives.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+ .arrayToString(localArchives.toArray(new String[localArchives
+ .size()])));
+ }
+ }
+
+ // Update the configuration object with localized files.
+ URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+ if (cacheFiles != null) {
+ List<String> localFiles = new ArrayList<String>();
+ for (int i = 0; i < cacheFiles.length; ++i) {
+ URI u = cacheFiles[i];
+ Path p = new Path(u);
+ Path name = new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localFiles.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
+ .arrayToString(localFiles.toArray(new String[localFiles.size()])));
+ }
+ }
+ }
+
+ private static final FsPermission urw_gr = FsPermission
+ .createImmutable((short) 0640);
+
+ /**
+ * Write the task specific job-configuration file.
+ *
+ * @throws IOException
+ */
+ private static void writeLocalJobFile(Path jobFile, JobConf conf)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(jobFile);
+ OutputStream out = null;
+ try {
+ out = FileSystem.create(localFs, jobFile, urw_gr);
+ conf.writeXml(out);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 0b6bc5f..48a1113 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -24,14 +24,17 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -40,6 +43,8 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.tez.common.InputSpec;
import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezEngineTaskContext;
@@ -55,6 +60,51 @@ public class MapUtils {
private static final Log LOG = LogFactory.getLog(MapUtils.class);
+ public static void configureLocalDirs(Configuration conf, String localDir)
+ throws IOException {
+ String[] localSysDirs = new String[1];
+ localSysDirs[0] = localDir;
+
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, localSysDirs);
+ conf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR,
+ System.getenv(Environment.PWD.name()));
+
+ LOG.info(TezJobConfig.LOCAL_DIRS + " for child: "
+ + conf.get(TezJobConfig.LOCAL_DIRS));
+ LOG.info(TezJobConfig.TASK_LOCAL_RESOURCE_DIR + " for child: "
+ + conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
+
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+ Path workDir = null;
+ // First, try to find the JOB_LOCAL_DIR on this host.
+ try {
+ workDir = lDirAlloc.getLocalPathToRead("work", conf);
+ } catch (DiskErrorException e) {
+ // DiskErrorException means dir not found. If not found, it will
+ // be created below.
+ }
+ if (workDir == null) {
+ // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+ workDir = lDirAlloc.getLocalPathForWrite("work", conf);
+ FileSystem lfs = FileSystem.getLocal(conf).getRaw();
+ boolean madeDir = false;
+ try {
+ madeDir = lfs.mkdirs(workDir);
+ } catch (FileAlreadyExistsException e) {
+ // Since all tasks will be running in their own JVM, the race condition
+ // exists where multiple tasks could be trying to create this directory
+ // at the same time. If this task loses the race, it's okay because
+ // the directory already exists.
+ madeDir = true;
+ workDir = lDirAlloc.getLocalPathToRead("work", conf);
+ }
+ if (!madeDir) {
+ throw new IOException("Mkdirs failed to create " + workDir.toString());
+ }
+ }
+ conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
+ }
+
private static InputSplit
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index cda15fb..1e5fe9b 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -55,36 +55,40 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class TestMapProcessor {
private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null;
+ private static Path workDir = null;
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(defaultConf);
+ workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestMapProcessor").makeQualified(localFs);
+ MapUtils.configureLocalDirs(defaultConf, workDir.toString());
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
- @SuppressWarnings("deprecation")
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "/tmp")),
- "TestMapProcessor").makeQualified(localFs);
+
+
TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
public void setUpJobConf(JobConf job) {
- job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+ job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
job.setClass(
Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class,
TezTaskOutput.class);
job.setNumReduceTasks(1);
}
-
+
@Before
@After
public void cleanup() throws Exception {
@@ -98,15 +102,18 @@ public class TestMapProcessor {
setUpJobConf(jobConf);
TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
mapOutputs.setConf(jobConf);
-
+
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+ conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+
Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
vertexName);
JobConf job = new JobConf(stageConf);
-
+
job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
+
MapUtils.runMapProcessor(localFs, workDir, job, 0,
new Path(workDir, "map0"), new TestUmbilicalProtocol(), vertexName,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be6d4bc0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 69571e1..d17b0be 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -56,27 +56,30 @@ import org.junit.Before;
import org.junit.Test;
+@SuppressWarnings("deprecation")
public class TestReduceProcessor {
private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
private static JobConf defaultConf = new JobConf();
- private static FileSystem localFs = null;
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(defaultConf);
+ workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestReduceProcessor").makeQualified(localFs);
+
+ MapUtils.configureLocalDirs(defaultConf, workDir.toString());
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
- @SuppressWarnings("deprecation")
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "/tmp")),
- "TestReduceProcessor").makeQualified(localFs);
public void setUpJobConf(JobConf job) {
- job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+ job.set(TezJobConfig.LOCAL_DIRS, workDir.toString());
job.setClass(
Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
TezLocalTaskOutputFiles.class,
@@ -101,6 +104,7 @@ public class TestReduceProcessor {
mapOutputs.setConf(jobConf);
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+ conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
mapVertexName);
@@ -109,6 +113,7 @@ public class TestReduceProcessor {
mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
+
// Run a map
MapUtils.runMapProcessor(localFs, workDir, mapConf, 0,
new Path(workDir, "map0"), new TestUmbilicalProtocol(), mapVertexName,
@@ -124,6 +129,8 @@ public class TestReduceProcessor {
reduceVertexName);
JobConf reduceConf = new JobConf(reduceStageConf);
reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
+ reduceConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+ "localized-resources").toUri().toString());
FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output"));
// Now run a reduce
@@ -147,8 +154,8 @@ public class TestReduceProcessor {
.toMRTaskId(taskContext.getTaskAttemptId().getTaskID()));
Path reduceOutputFile = new Path(reduceOutputDir, "part-00000");
- @SuppressWarnings("deprecation")
- SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, reduceConf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(localFs,
+ reduceOutputFile, reduceConf);
LongWritable key = new LongWritable();
Text value = new Text();