You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [31/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,932 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.LocalFinalTask;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/** Implements MapReduce locally, in-process, for debugging. */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LocalJobRunner implements ClientProtocol {
+ public static final Log LOG =
+ LogFactory.getLog(LocalJobRunner.class);
+
+ /** The maximum number of map tasks to run in parallel in LocalJobRunner */
+ public static final String LOCAL_MAX_MAPS =
+ "mapreduce.local.map.tasks.maximum";
+
+ private FileSystem fs;
+ private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
+ private JobConf conf;
+ private AtomicInteger map_tasks = new AtomicInteger(0);
+ private int reduce_tasks = 0;
+ final Random rand = new Random();
+
+ private LocalJobRunnerMetrics myMetrics = null;
+
+ private static final String jobDir = "localRunner/";
+
+ private static final TezCounters EMPTY_COUNTERS = new TezCounters();
+
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return ClientProtocol.versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
+ private class Job extends Thread implements TezTaskUmbilicalProtocol {
+ // The job directory on the system: JobClient places job configurations here.
+ // This is analogous to JobTracker's system directory.
+ private Path systemJobDir;
+ private Path systemJobFile;
+
+ // The job directory for the task. Analagous to a task's job directory.
+ private Path localJobDir;
+ private Path localJobFile;
+
+ private JobID id;
+ private JobConf job;
+
+ private int numMapTasks;
+ private float [] partialMapProgress;
+ private TezCounters [] mapCounters;
+ private TezCounters reduceCounters;
+
+ private JobStatus status;
+ private List<TaskAttemptID> mapIds = Collections.synchronizedList(
+ new ArrayList<TaskAttemptID>());
+
+ private JobProfile profile;
+ private FileSystem localFs;
+ boolean killed = false;
+
+ private LocalDistributedCacheManager localDistributedCacheManager;
+
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TaskUmbilicalProtocol.versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
+ public Job(JobID jobid, String jobSubmitDir) throws IOException {
+ this.systemJobDir = new Path(jobSubmitDir);
+ this.systemJobFile = new Path(systemJobDir, "job.xml");
+ this.id = jobid;
+ JobConf conf = new JobConf(systemJobFile);
+ this.localFs = FileSystem.getLocal(conf);
+ this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+ this.localJobFile = new Path(this.localJobDir, id + ".xml");
+
+ // Manage the distributed cache. If there are files to be copied,
+ // this will trigger localFile to be re-written again.
+ localDistributedCacheManager = new LocalDistributedCacheManager();
+ localDistributedCacheManager.setup(conf);
+
+ // Write out configuration file. Instead of copying it from
+ // systemJobFile, we re-write it, since setup(), above, may have
+ // updated it.
+ OutputStream out = localFs.create(localJobFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ this.job = new JobConf(localJobFile);
+
+ // Job (the current object) is a Thread, so we wrap its class loader.
+ if (localDistributedCacheManager.hasLocalClasspaths()) {
+ setContextClassLoader(localDistributedCacheManager.makeClassLoader(
+ getContextClassLoader()));
+ }
+
+ profile = new JobProfile(job.getUser(), id, systemJobFile.toString(),
+ "http://localhost:8080/", job.getJobName());
+ status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING,
+ profile.getUser(), profile.getJobName(), profile.getJobFile(),
+ profile.getURL().toString());
+
+ jobs.put(id, this);
+
+ this.start();
+ }
+
+ /**
+ * A Runnable instance that handles a map task to be run by an executor.
+ */
+ protected class MapTaskRunnable implements Runnable {
+ private final int taskId;
+ private final TaskSplitMetaInfo info;
+ private final JobID jobId;
+ private final JobConf localConf;
+
+ // This is a reference to a shared object passed in by the
+ // external context; this delivers state to the reducers regarding
+ // where to fetch mapper outputs.
+ private final Map<TaskAttemptID, TezTaskOutput> mapOutputFiles;
+
+ public volatile Throwable storedException;
+
+ public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
+ Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+ this.info = info;
+ this.taskId = taskId;
+ this.mapOutputFiles = mapOutputFiles;
+ this.jobId = jobId;
+ this.localConf = new JobConf(job);
+ }
+
+ public void run() {
+ try {
+ TaskAttemptID mapId = new TaskAttemptID(new TaskID(
+ jobId, TaskType.MAP, taskId), 0);
+ LOG.info("Starting task: " + mapId);
+ final String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ setupChildMapredLocalDirs(mapId, user, localConf);
+ localConf.setUser(user);
+
+ TezTaskAttemptID tezMapId =
+ IDConverter.fromMRTaskAttemptId(mapId);
+ mapIds.add(mapId);
+ TezEngineTask taskContext =
+ new TezEngineTask(
+ tezMapId, user, localConf.getJobName(), "TODO_vertexName",
+ InitialTask.class.getName(), null, null);
+ Injector injector = Guice.createInjector(new InitialTask());
+
+ TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
+ mapOutput.setConf(localConf);
+ mapOutputFiles.put(mapId, mapOutput);
+
+ try {
+ map_tasks.getAndIncrement();
+ myMetrics.launchMap(mapId);
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(localConf, Job.this);
+ t.run();
+ myMetrics.completeMap(mapId);
+ } finally {
+ map_tasks.getAndDecrement();
+ }
+
+ LOG.info("Finishing task: " + mapId);
+ } catch (Throwable e) {
+ this.storedException = e;
+ }
+ }
+ }
+
+ /**
+ * Create Runnables to encapsulate map tasks for use by the executor
+ * service.
+ * @param taskInfo Info about the map task splits
+ * @param jobId the job id
+ * @param mapOutputFiles a mapping from task attempts to output files
+ * @return a List of Runnables, one per map task.
+ */
+ protected List<MapTaskRunnable> getMapTaskRunnables(
+ TaskSplitMetaInfo [] taskInfo, JobID jobId,
+ Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+
+ int numTasks = 0;
+ ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+ for (TaskSplitMetaInfo task : taskInfo) {
+ list.add(new MapTaskRunnable(task, numTasks++, jobId,
+ mapOutputFiles));
+ }
+
+ return list;
+ }
+
+ /**
+ * Initialize the counters that will hold partial-progress from
+ * the various task attempts.
+ * @param numMaps the number of map tasks in this job.
+ */
+ private synchronized void initCounters(int numMaps) {
+ // Initialize state trackers for all map tasks.
+ this.partialMapProgress = new float[numMaps];
+ this.mapCounters = new TezCounters[numMaps];
+ for (int i = 0; i < numMaps; i++) {
+ this.mapCounters[i] = EMPTY_COUNTERS;
+ }
+
+ this.reduceCounters = EMPTY_COUNTERS;
+ }
+
+ /**
+ * Creates the executor service used to run map tasks.
+ *
+ * @param numMapTasks the total number of map tasks to be run
+ * @return an ExecutorService instance that handles map tasks
+ */
+ protected ExecutorService createMapExecutor(int numMapTasks) {
+
+ // Determine the size of the thread pool to use
+ int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
+ if (maxMapThreads < 1) {
+ throw new IllegalArgumentException(
+ "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
+ }
+ this.numMapTasks = numMapTasks;
+ maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
+ maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
+
+ initCounters(this.numMapTasks);
+
+ LOG.debug("Starting thread pool executor.");
+ LOG.debug("Max local threads: " + maxMapThreads);
+ LOG.debug("Map tasks to process: " + this.numMapTasks);
+
+ // Create a new executor service to drain the work queue.
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("LocalJobRunner Map Task Executor #%d")
+ .build();
+ ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+
+ return executor;
+ }
+
+ private org.apache.hadoop.mapreduce.OutputCommitter
+ createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
+ org.apache.hadoop.mapreduce.OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.TaskID taskId =
+ new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
+ org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
+ new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(conf, taskAttemptID);
+ @SuppressWarnings("rawtypes")
+ OutputFormat outputFormat =
+ ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
+ }
+
+ @Override
+ public void run() {
+ JobID jobId = profile.getJobID();
+ JobContext jContext = new JobContextImpl(job, jobId);
+
+ org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
+ try {
+ outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
+ } catch (Exception e) {
+ LOG.info("Failed to createOutputCommitter", e);
+ return;
+ }
+
+ try {
+ TaskSplitMetaInfo[] taskSplitMetaInfos =
+ SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
+
+ int numReduceTasks = job.getNumReduceTasks();
+ if (numReduceTasks > 1 || numReduceTasks < 0) {
+ // we only allow 0 or 1 reducer in local mode
+ numReduceTasks = 1;
+ job.setNumReduceTasks(1);
+ }
+ outputCommitter.setupJob(jContext);
+ status.setSetupProgress(1.0f);
+
+ Map<TaskAttemptID, TezTaskOutput> mapOutputFiles =
+ Collections.synchronizedMap(new HashMap<TaskAttemptID, TezTaskOutput>());
+
+ List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
+ jobId, mapOutputFiles);
+ ExecutorService mapService = createMapExecutor(taskRunnables.size());
+
+ // Start populating the executor with work units.
+ // They may begin running immediately (in other threads).
+ for (Runnable r : taskRunnables) {
+ mapService.submit(r);
+ }
+
+ try {
+ mapService.shutdown(); // Instructs queue to drain.
+
+ // Wait for tasks to finish; do not use a time-based timeout.
+ // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+ LOG.info("Waiting for map tasks");
+ mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException ie) {
+ // Cancel all threads.
+ mapService.shutdownNow();
+ throw ie;
+ }
+
+ LOG.info("Map task executor complete.");
+
+ // After waiting for the map tasks to complete, if any of these
+ // have thrown an exception, rethrow it now in the main thread context.
+ for (MapTaskRunnable r : taskRunnables) {
+ if (r.storedException != null) {
+ throw new Exception(r.storedException);
+ }
+ }
+
+ TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+ jobId, TaskType.REDUCE, 0), 0);
+ LOG.info("Starting task: " + reduceId);
+ try {
+ if (numReduceTasks > 0) {
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ JobConf localConf = new JobConf(job);
+ localConf.setUser(user);
+ localConf.set("mapreduce.jobtracker.address", "local");
+ localConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, mapIds.size());
+ setupChildMapredLocalDirs(reduceId, user, localConf);
+
+
+
+ TezEngineTask taskContext = new TezEngineTask(
+ IDConverter.fromMRTaskAttemptId(reduceId), user,
+ localConf.getJobName(), "TODO_vertexName", LocalFinalTask.class.getName(),
+ Collections.singletonList(new InputSpec("TODO_srcVertexName",
+ mapIds.size())), null);
+ Injector injector = Guice.createInjector(new LocalFinalTask());
+
+ // move map output to reduce input
+ for (int i = 0; i < mapIds.size(); i++) {
+ if (!this.isInterrupted()) {
+ TaskAttemptID mapId = mapIds.get(i);
+ LOG.info("XXX mapId: " + i +
+ " LOCAL_DIR = " +
+ mapOutputFiles.get(mapId).getConf().get(
+ TezJobConfig.LOCAL_DIR));
+ Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+ TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
+ localOutputFile.setConf(localConf);
+ Path reduceIn =
+ localOutputFile.getInputFileForWrite(
+ IDConverter.fromMRTaskId(mapId.getTaskID()),
+ localFs.getFileStatus(mapOut).getLen());
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ } else {
+ throw new InterruptedException();
+ }
+ }
+ if (!this.isInterrupted()) {
+ reduce_tasks += 1;
+ myMetrics.launchReduce(reduceId);
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(localConf, Job.this);
+ t.run();
+ myMetrics.completeReduce(reduceId);
+ reduce_tasks -= 1;
+ } else {
+ throw new InterruptedException();
+ }
+ }
+ } finally {
+ for (TezTaskOutput output : mapOutputFiles.values()) {
+ output.removeAll();
+ }
+ }
+ // delete the temporary directory in output directory
+ // FIXME
+ //outputCommitter.commitJob(jContext);
+ status.setCleanupProgress(1.0f);
+
+ if (killed) {
+ this.status.setRunState(JobStatus.KILLED);
+ } else {
+ this.status.setRunState(JobStatus.SUCCEEDED);
+ }
+
+ JobEndNotifier.localRunnerNotification(job, status);
+
+ } catch (Throwable t) {
+ try {
+ outputCommitter.abortJob(jContext,
+ org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+ } catch (IOException ioe) {
+ LOG.info("Error cleaning up job:" + id);
+ }
+ status.setCleanupProgress(1.0f);
+ if (killed) {
+ this.status.setRunState(JobStatus.KILLED);
+ } else {
+ this.status.setRunState(JobStatus.FAILED);
+ }
+ LOG.warn(id, t);
+
+ JobEndNotifier.localRunnerNotification(job, status);
+
+ } finally {
+ try {
+ fs.delete(systemJobFile.getParent(), true); // delete submit dir
+ localFs.delete(localJobFile, true); // delete local copy
+ // Cleanup distributed cache
+ localDistributedCacheManager.close();
+ } catch (IOException e) {
+ LOG.warn("Error cleaning up "+id+": "+e);
+ }
+ }
+ }
+
+ // TaskUmbilicalProtocol methods
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public synchronized boolean statusUpdate(TezTaskAttemptID taskId,
+ TezTaskStatus taskStatus) throws IOException, InterruptedException {
+ LOG.info(taskStatus.getStateString());
+ int taskIndex = mapIds.indexOf(taskId);
+ if (taskIndex >= 0) { // mapping
+ float numTasks = (float) this.numMapTasks;
+
+ partialMapProgress[taskIndex] = taskStatus.getProgress();
+ mapCounters[taskIndex] = taskStatus.getCounters();
+
+ float partialProgress = 0.0f;
+ for (float f : partialMapProgress) {
+ partialProgress += f;
+ }
+ status.setMapProgress(partialProgress / numTasks);
+ } else {
+ reduceCounters = taskStatus.getCounters();
+ status.setReduceProgress(taskStatus.getProgress());
+ }
+
+ // ignore phase
+ return true;
+ }
+
+ /** Return the current values of the counters for this job,
+ * including tasks that are in progress.
+ */
+ public synchronized TezCounters getCurrentCounters() {
+ if (null == mapCounters) {
+ // Counters not yet initialized for job.
+ return EMPTY_COUNTERS;
+ }
+
+ TezCounters current = EMPTY_COUNTERS;
+ for (TezCounters c : mapCounters) {
+ current.incrAllCounters(c);
+ }
+ current.incrAllCounters(reduceCounters);
+ return current;
+ }
+
+ /**
+ * Task is reporting that it is in commit_pending
+ * and it is waiting for the commit Response
+ */
+ @Override
+ public void commitPending(TezTaskAttemptID taskid,
+ TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ statusUpdate(taskid, taskStatus);
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) {
+ // Ignore for now
+ }
+
+ @Override
+ public boolean ping(TezTaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void done(TezTaskAttemptID taskId) throws IOException {
+ int taskIndex = mapIds.indexOf(taskId);
+ if (taskIndex >= 0) { // mapping
+ status.setMapProgress(1.0f);
+ } else {
+ status.setReduceProgress(1.0f);
+ }
+ }
+
+ @Override
+ public synchronized void fsError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.fatal("FSError: "+ message + "from task: " + taskId);
+ }
+
+ @Override
+ public void shuffleError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+ }
+
+ @Override
+ public synchronized void fatalError(TezTaskAttemptID taskId, String msg)
+ throws IOException {
+ LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+ }
+
+ @Override
+ public TezTaskDependencyCompletionEventsUpdate
+ getDependentTasksCompletionEvents(
+ int fromEventIdx, int maxEventsToFetch,
+ TezTaskAttemptID reduce) {
+ throw new UnsupportedOperationException(
+ "getDependentTasksCompletionEvents not supported in LocalJobRunner");
+ }
+
+ @Override
+ public void outputReady(TezTaskAttemptID taskAttemptId,
+ OutputContext outputContext) throws IOException {
+ // Ignore for now.
+ }
+
+ @Override
+ public ProceedToCompletionResponse proceedToCompletion(
+ TezTaskAttemptID taskAttemptId) throws IOException {
+ // TODO TEZAM5 Really depends on the module - inmem shuffle or not.
+ return new ProceedToCompletionResponse(true, true);
+ }
+ }
+
+ public LocalJobRunner(Configuration conf) throws IOException {
+ this(new JobConf(conf));
+ }
+
+ @Deprecated
+ public LocalJobRunner(JobConf conf) throws IOException {
+ this.fs = FileSystem.getLocal(conf);
+ this.conf = conf;
+ myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
+ }
+
+ // JobSubmissionProtocol methods
+
+ private static int jobid = 0;
+ public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
+ return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
+ }
+
+ public org.apache.hadoop.mapreduce.JobStatus submitJob(
+ org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
+ Credentials credentials) throws IOException {
+ Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
+ job.job.setCredentials(credentials);
+ return job.status;
+
+ }
+
+ public void killJob(org.apache.hadoop.mapreduce.JobID id) {
+ jobs.get(JobID.downgrade(id)).killed = true;
+ jobs.get(JobID.downgrade(id)).interrupt();
+ }
+
+ public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
+ String jp) throws IOException {
+ throw new UnsupportedOperationException("Changing job priority " +
+ "in LocalJobRunner is not supported.");
+ }
+
+ /** Throws {@link UnsupportedOperationException} */
+ public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+ boolean shouldFail) throws IOException {
+ throw new UnsupportedOperationException("Killing tasks in " +
+ "LocalJobRunner is not supported");
+ }
+
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+ org.apache.hadoop.mapreduce.JobID id, TaskType type) {
+ return new org.apache.hadoop.mapreduce.TaskReport[0];
+ }
+
+ public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
+ org.apache.hadoop.mapreduce.JobID id) {
+ Job job = jobs.get(JobID.downgrade(id));
+ if(job != null)
+ return job.status;
+ else
+ return null;
+ }
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(
+ org.apache.hadoop.mapreduce.JobID id) {
+ Job job = jobs.get(JobID.downgrade(id));
+
+ return new org.apache.hadoop.mapreduce.Counters(
+ new MRCounters(job.getCurrentCounters()));
+ }
+
+ public String getFilesystemName() throws IOException {
+ return fs.getUri().toString();
+ }
+
+ public ClusterMetrics getClusterMetrics() {
+ int numMapTasks = map_tasks.get();
+ return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
+ reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+ }
+
+ public JobTrackerStatus getJobTrackerStatus() {
+ return JobTrackerStatus.RUNNING;
+ }
+
+ public long getTaskTrackerExpiryInterval()
+ throws IOException, InterruptedException {
+ return 0;
+ }
+
+ /**
+ * Get all active trackers in cluster.
+ * @return array of TaskTrackerInfo
+ */
+ public TaskTrackerInfo[] getActiveTrackers()
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ /**
+ * Get all blacklisted trackers in cluster.
+ * @return array of TaskTrackerInfo
+ */
+ public TaskTrackerInfo[] getBlacklistedTrackers()
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(
+ org.apache.hadoop.mapreduce.JobID jobid
+ , int fromEventId, int maxEvents) throws IOException {
+ return TaskCompletionEvent.EMPTY_ARRAY;
+ }
+
+ public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
+
+
+ /**
+ * Returns the diagnostic information for a particular task in the given job.
+ * To be implemented
+ */
+ public String[] getTaskDiagnostics(
+ org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
+ return new String [0];
+ }
+
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
+ */
+ public String getSystemDir() {
+ Path sysDir = new Path(
+ conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
+ return fs.makeQualified(sysDir).toString();
+ }
+
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+ */
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return new AccessControlList(" ");// no queue admins for local job runner
+ }
+
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+ */
+ public String getStagingAreaDir() throws IOException {
+ Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
+ "/tmp/hadoop/mapred/staging"));
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String user;
+ if (ugi != null) {
+ user = ugi.getShortUserName() + rand.nextInt();
+ } else {
+ user = "dummy" + rand.nextInt();
+ }
+ return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+ }
+
+ public String getJobHistoryDir() {
+ return null;
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String queueName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException {
+ return null;
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException {
+ return null;
+ }
+
+
+ @Override
+ public QueueInfo getQueue(String queue) throws IOException {
+ return null;
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.QueueAclsInfo[]
+ getQueueAclsForCurrentUser() throws IOException{
+ return null;
+ }
+
+ /**
+ * Set the max number of map tasks to run concurrently in the LocalJobRunner.
+ * @param job the job to configure
+ * @param maxMaps the maximum number of map tasks to allow.
+ */
+ public static void setLocalMaxRunningMaps(
+ org.apache.hadoop.mapreduce.JobContext job,
+ int maxMaps) {
+ job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
+ }
+
+ /**
+ * @return the max number of map tasks to run concurrently in the
+ * LocalJobRunner.
+ */
+ public static int getLocalMaxRunningMaps(
+ org.apache.hadoop.mapreduce.JobContext job) {
+ return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
+ ) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier>
+ getDelegationToken(Text renewer) throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token
+ ) throws IOException,InterruptedException{
+ return 0;
+ }
+
+ @Override
+ public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
+ org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ static void setupChildMapredLocalDirs(
+ TaskAttemptID taskAttemptID, String user, JobConf conf) {
+ String[] localDirs =
+ conf.getTrimmedStrings(
+ TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR);
+ String jobId = taskAttemptID.getJobID().toString();
+ String taskId = taskAttemptID.getTaskID().toString();
+ boolean isCleanup = false;
+ StringBuffer childMapredLocalDir =
+ new StringBuffer(localDirs[0] + Path.SEPARATOR
+ + getLocalTaskDir(user, jobId, taskId, isCleanup));
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ + getLocalTaskDir(user, jobId, taskId, isCleanup));
+ }
+ LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID +
+ " is " + childMapredLocalDir);
+ conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString());
+ conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezLocalTaskOutputFiles.class, TezTaskOutput.class);
+ }
+
+ static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+ static final String SUBDIR = jobDir;
+ static final String JOBCACHE = "jobcache";
+
+ static String getLocalTaskDir(String user, String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+ + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+ }
+ return taskDir;
+ }
+
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+@SuppressWarnings("deprecation")
+class LocalJobRunnerMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+
+ private int numMapTasksLaunched = 0;
+ private int numMapTasksCompleted = 0;
+ private int numReduceTasksLaunched = 0;
+ private int numReduceTasksCompleted = 0;
+ private int numWaitingMaps = 0;
+ private int numWaitingReduces = 0;
+
+ public LocalJobRunnerMetrics(JobConf conf) {
+ String sessionId = conf.getSessionId();
+ // Initiate JVM Metrics
+ JvmMetrics.init("JobTracker", sessionId);
+ // Create a record for map-reduce metrics
+ MetricsContext context = MetricsUtil.getContext("mapred");
+ // record name is jobtracker for compatibility
+ metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+ metricsRecord.setTag("sessionId", sessionId);
+ context.registerUpdater(this);
+ }
+
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+ metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+ metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+ metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+ metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
+ metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+ numMapTasksLaunched = 0;
+ numMapTasksCompleted = 0;
+ numReduceTasksLaunched = 0;
+ numReduceTasksCompleted = 0;
+ numWaitingMaps = 0;
+ numWaitingReduces = 0;
+ }
+ metricsRecord.update();
+ }
+
+ public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksLaunched;
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
+
+ public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksCompleted;
+ }
+
+ public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksLaunched;
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
+
+ public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksCompleted;
+ }
+
+ private synchronized void decWaitingMaps(JobID id, int task) {
+ numWaitingMaps -= task;
+ }
+
+ private synchronized void decWaitingReduces(JobID id, int task){
+ numWaitingReduces -= task;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * A utility that reads the split meta info and creates split meta info objects
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SplitMetaInfoReaderTez {
+
+ public static final Log LOG = LogFactory.getLog(SplitMetaInfoReaderTez.class);
+
+ // Forked from the MR variant so that the metaInfo file as well as the split
+ // file can be read from local fs - relying on these files being localized.
+ public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+ FileSystem fs) throws IOException {
+
+ long maxMetaInfoSize = conf.getLong(
+ MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+ MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
+
+ Path metaSplitFile = new Path(MRJobConfig.JOB_SPLIT_METAINFO);
+ String jobSplitFile = MRJobConfig.JOB_SPLIT;
+
+ File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
+ LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: "
+ + file.getAbsolutePath() + ", defaultFS from conf: "
+ + FileSystem.getDefaultUri(conf));
+
+ FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+ if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+ throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+ + ". Aborting job ");
+ }
+ FSDataInputStream in = fs.open(metaSplitFile);
+ byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+ in.readFully(header);
+ if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+ throw new IOException("Invalid header on split file");
+ }
+ int vers = WritableUtils.readVInt(in);
+ if (vers != JobSplit.META_SPLIT_VERSION) {
+ in.close();
+ throw new IOException("Unsupported split version " + vers);
+ }
+ int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+ JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+ splitMetaInfo.readFields(in);
+ JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+ jobSplitFile, splitMetaInfo.getStartOffset());
+ allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+ splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+ }
+ in.close();
+ return allSplitMetaInfo;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.records.TezContainerId;
+
+// TODO EVENTUALLY move this over to PB. Fix package/module.
+// TODO EVENTUALLY unit tests for functionality.
+public class ContainerContext implements Writable {
+
+ ContainerId containerId;
+ String pid;
+
+ public ContainerContext() {
+ containerId = Records.newRecord(ContainerId.class);
+ pid = "";
+ }
+
+ public ContainerContext(ContainerId containerId, String pid) {
+ this.containerId = containerId;
+ this.pid = pid;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ TezContainerId tezContainerId = new TezContainerId();
+ tezContainerId.readFields(in);
+ this.containerId = tezContainerId.getContainerId();
+ this.pid = Text.readString(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TezContainerId tezContainerId = new TezContainerId(containerId);
+ tezContainerId.write(out);
+ Text.writeString(out, pid);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+
+public class DeprecatedKeys {
+ static {
+ addDeprecatedKeys();
+ }
+
+ // TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.
+ // Maybe define in an TEZConfiguration / TEZ JobConf variant.
+
+ public static void init() {
+ }
+
+ private static void addDeprecatedKeys() {
+
+ _(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+
+ _(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+
+ _(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+
+ _(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
+
+ _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+
+ _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
+
+ _(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+
+ _(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+
+ _(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+
+ _(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+
+ _(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+
+ _(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+
+ _(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+
+ _(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+
+ _(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+
+ _(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+
+ _(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+
+ _(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+
+ _(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+
+ _(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+
+ _(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+
+ _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+
+ _(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+
+ _(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+
+ _(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+
+ _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+
+ _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+ }
+
+ private static void _(String oldKey, String newKey) {
+ Configuration.addDeprecation(oldKey, newKey);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class IDConverter {
+
+ // FIXME hardcoded assumption that one app is one dag
+ public static JobID toMRJobId(TezDAGID dagId) {
+ return new JobID(
+ Long.toString(dagId.getApplicationId().getClusterTimestamp()),
+ dagId.getApplicationId().getId());
+ }
+
+ public static TaskID toMRTaskId(TezTaskID taskid) {
+ return new TaskID(
+ toMRJobId(taskid.getVertexID().getDAGId()),
+ taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
+ taskid.getId());
+ }
+
+ public static TaskAttemptID toMRTaskAttemptId(
+ TezTaskAttemptID taskAttemptId) {
+ return new TaskAttemptID(
+ toMRTaskId(taskAttemptId.getTaskID()),
+ taskAttemptId.getId());
+ }
+
+ // FIXME hardcoded assumption that one app is one dag
+ public static TezDAGID fromMRJobId(
+ org.apache.hadoop.mapreduce.JobID jobId) {
+ return new TezDAGID(BuilderUtils.newApplicationId(
+ Long.valueOf(jobId.getJtIdentifier()), jobId.getId()), 1);
+ }
+
+ public static MRTaskType fromMRTaskType(TaskType type) {
+ switch (type) {
+ case REDUCE:
+ return MRTaskType.REDUCE;
+ case JOB_SETUP:
+ return MRTaskType.JOB_SETUP;
+ case JOB_CLEANUP:
+ return MRTaskType.JOB_CLEANUP;
+ case TASK_CLEANUP:
+ return MRTaskType.TASK_CLEANUP;
+ case MAP:
+ return MRTaskType.MAP;
+ default:
+ throw new RuntimeException("Unknown TaskType: " + type);
+ }
+ }
+
+ // FIXME hack alert converting objects with hard coded id
+ public static TezTaskID
+ fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
+ return new TezTaskID(
+ new TezVertexID(fromMRJobId(taskid.getJobID()),
+ (taskid.getTaskType() == TaskType.MAP ? 0 : 1)
+ ),
+ taskid.getId());
+ }
+
+ public static TezTaskAttemptID fromMRTaskAttemptId(
+ org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptId) {
+ return new TezTaskAttemptID(
+ fromMRTaskId(taskAttemptId.getTaskID()),
+ taskAttemptId.getId());
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Place holder for cluster level configuration keys.
+ *
+ * The keys should have "mapreduce.cluster." as the prefix.
+ *
+ */
+@InterfaceAudience.Private
+public interface MRConfig {
+
+ // Cluster-level configuration parameters
+ public static final String TEMP_DIR = "mapreduce.cluster.temp.dir";
+ public static final String LOCAL_DIR = "mapreduce.cluster.local.dir";
+ public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
+ public static final String REDUCEMEMORY_MB =
+ "mapreduce.cluster.reducememory.mb";
+ public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
+ public static final String MR_ADMINS =
+ "mapreduce.cluster.administrators";
+ @Deprecated
+ public static final String MR_SUPERGROUP =
+ "mapreduce.cluster.permissions.supergroup";
+
+ //Delegation token related keys
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ "mapreduce.cluster.delegation.key.update-interval";
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "mapreduce.cluster.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "mapreduce.cluster.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
+
+ public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
+ "mapreduce.job.process-tree.class";
+ public static final String STATIC_RESOLUTIONS =
+ "mapreduce.job.net.static.resolutions";
+
+ public static final String MASTER_ADDRESS = "mapreduce.jobtracker.address";
+ public static final String MASTER_USER_NAME =
+ "mapreduce.jobtracker.kerberos.principal";
+
+ public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
+ public static final String CLASSIC_FRAMEWORK_NAME = "classic";
+ public static final String YARN_TEZ_FRAMEWORK_NAME = "yarn-tez";
+ public static final String LOCAL_FRAMEWORK_NAME = "local";
+
+ public static final String TASK_LOCAL_OUTPUT_CLASS =
+ "mapreduce.task.local.output.class";
+
+ public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
+ "mapreduce.task.max.status.length";
+ public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
+
+ public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
+ public static final String MAX_BLOCK_LOCATIONS_KEY =
+ "mapreduce.job.max.split.locations";
+
+ public static final String SHUFFLE_SSL_ENABLED_KEY =
+ "mapreduce.shuffle.ssl.enabled";
+
+ public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String MAPRED_IFILE_READAHEAD =
+ "mapreduce.ifile.readahead";
+
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
+ "mapreduce.ifile.readahead.bytes";
+
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
------------------------------------------------------------------------------
svn:eol-style = native