You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [31/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,932 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.LocalFinalTask;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/** Implements MapReduce locally, in-process, for debugging. */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LocalJobRunner implements ClientProtocol {
+  public static final Log LOG =
+    LogFactory.getLog(LocalJobRunner.class);
+
+  /** The maximum number of map tasks to run in parallel in LocalJobRunner */
+  public static final String LOCAL_MAX_MAPS =
+    "mapreduce.local.map.tasks.maximum";
+
+  private FileSystem fs;
+  private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
+  private JobConf conf;
+  private AtomicInteger map_tasks = new AtomicInteger(0);
+  private int reduce_tasks = 0;
+  final Random rand = new Random();
+  
+  private LocalJobRunnerMetrics myMetrics = null;
+
+  private static final String jobDir =  "localRunner/";
+
+  private static final TezCounters EMPTY_COUNTERS = new TezCounters();
+
+  public long getProtocolVersion(String protocol, long clientVersion) {
+    return ClientProtocol.versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
+  private class Job extends Thread implements TezTaskUmbilicalProtocol {
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+    
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
+    private JobID id;
+    private JobConf job;
+
+    private int numMapTasks;
+    private float [] partialMapProgress;
+    private TezCounters [] mapCounters;
+    private TezCounters reduceCounters;
+
+    private JobStatus status;
+    private List<TaskAttemptID> mapIds = Collections.synchronizedList(
+        new ArrayList<TaskAttemptID>());
+
+    private JobProfile profile;
+    private FileSystem localFs;
+    boolean killed = false;
+    
+    private LocalDistributedCacheManager localDistributedCacheManager;
+
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSignature(
+          this, protocol, clientVersion, clientMethodsHash);
+    }
+
+    public Job(JobID jobid, String jobSubmitDir) throws IOException {
+      this.systemJobDir = new Path(jobSubmitDir);
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
+      this.id = jobid;
+      JobConf conf = new JobConf(systemJobFile);
+      this.localFs = FileSystem.getLocal(conf);
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
+
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      localDistributedCacheManager = new LocalDistributedCacheManager();
+      localDistributedCacheManager.setup(conf);
+      
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (localDistributedCacheManager.hasLocalClasspaths()) {
+        setContextClassLoader(localDistributedCacheManager.makeClassLoader(
+                getContextClassLoader()));
+      }
+      
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
+                               "http://localhost:8080/", job.getJobName());
+      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
+          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
+          profile.getURL().toString());
+
+      jobs.put(id, this);
+
+      this.start();
+    }
+
+    /**
+     * A Runnable instance that handles a map task to be run by an executor.
+     */
+    protected class MapTaskRunnable implements Runnable {
+      private final int taskId;
+      private final TaskSplitMetaInfo info;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, TezTaskOutput> mapOutputFiles;
+
+      public volatile Throwable storedException;
+
+      public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
+          Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+        this.info = info;
+        this.taskId = taskId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.jobId = jobId;
+        this.localConf = new JobConf(job);
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
+              jobId, TaskType.MAP, taskId), 0);
+          LOG.info("Starting task: " + mapId);
+          final String user = 
+              UserGroupInformation.getCurrentUser().getShortUserName();
+          setupChildMapredLocalDirs(mapId, user, localConf);
+          localConf.setUser(user);
+
+          TezTaskAttemptID tezMapId =
+              IDConverter.fromMRTaskAttemptId(mapId);
+          mapIds.add(mapId);
+          TezEngineTask taskContext = 
+              new TezEngineTask(
+                  tezMapId, user, localConf.getJobName(), "TODO_vertexName",
+                  InitialTask.class.getName(), null, null);
+          Injector injector = Guice.createInjector(new InitialTask());
+
+          TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
+          mapOutput.setConf(localConf);
+          mapOutputFiles.put(mapId, mapOutput);
+
+          try {
+            map_tasks.getAndIncrement();
+            myMetrics.launchMap(mapId);
+            TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+            Task t = factory.createTask(taskContext);
+            t.initialize(localConf, Job.this);
+            t.run();
+            myMetrics.completeMap(mapId);
+          } finally {
+            map_tasks.getAndDecrement();
+          }
+
+          LOG.info("Finishing task: " + mapId);
+        } catch (Throwable e) {
+          this.storedException = e;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate map tasks for use by the executor
+     * service.
+     * @param taskInfo Info about the map task splits
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per map task.
+     */
+    protected List<MapTaskRunnable> getMapTaskRunnables(
+        TaskSplitMetaInfo [] taskInfo, JobID jobId,
+        Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+
+      int numTasks = 0;
+      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      for (TaskSplitMetaInfo task : taskInfo) {
+        list.add(new MapTaskRunnable(task, numTasks++, jobId,
+            mapOutputFiles));
+      }
+
+      return list;
+    }
+
+    /**
+     * Initialize the counters that will hold partial-progress from
+     * the various task attempts.
+     * @param numMaps the number of map tasks in this job.
+     */
+    private synchronized void initCounters(int numMaps) {
+      // Initialize state trackers for all map tasks.
+      this.partialMapProgress = new float[numMaps];
+      this.mapCounters = new TezCounters[numMaps];
+      for (int i = 0; i < numMaps; i++) {
+        this.mapCounters[i] = EMPTY_COUNTERS;
+      }
+
+      this.reduceCounters = EMPTY_COUNTERS;
+    }
+
+    /**
+     * Creates the executor service used to run map tasks.
+     *
+     * @param numMapTasks the total number of map tasks to be run
+     * @return an ExecutorService instance that handles map tasks
+     */
+    protected ExecutorService createMapExecutor(int numMapTasks) {
+
+      // Determine the size of the thread pool to use
+      int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
+      if (maxMapThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
+      }
+      this.numMapTasks = numMapTasks;
+      maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
+      maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
+
+      initCounters(this.numMapTasks);
+
+      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Max local threads: " + maxMapThreads);
+      LOG.debug("Map tasks to process: " + this.numMapTasks);
+
+      // Create a new executor service to drain the work queue.
+      ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("LocalJobRunner Map Task Executor #%d")
+        .build();
+      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+
+      return executor;
+    }
+
+    private org.apache.hadoop.mapreduce.OutputCommitter 
+    createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
+      org.apache.hadoop.mapreduce.OutputCommitter committer = null;
+
+      LOG.info("OutputCommitter set in config "
+          + conf.get("mapred.output.committer.class"));
+
+      if (newApiCommitter) {
+        org.apache.hadoop.mapreduce.TaskID taskId =
+            new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
+            new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 
+            new TaskAttemptContextImpl(conf, taskAttemptID);
+        @SuppressWarnings("rawtypes")
+        OutputFormat outputFormat =
+          ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } else {
+        committer = ReflectionUtils.newInstance(conf.getClass(
+            "mapred.output.committer.class", FileOutputCommitter.class,
+            org.apache.hadoop.mapred.OutputCommitter.class), conf);
+      }
+      LOG.info("OutputCommitter is " + committer.getClass().getName());
+      return committer;
+    }
+
+    @Override
+    public void run() {
+      JobID jobId = profile.getJobID();
+      JobContext jContext = new JobContextImpl(job, jobId);
+      
+      org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
+      try {
+        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
+      } catch (Exception e) {
+        LOG.info("Failed to createOutputCommitter", e);
+        return;
+      }
+      
+      try {
+        TaskSplitMetaInfo[] taskSplitMetaInfos = 
+          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
+
+        int numReduceTasks = job.getNumReduceTasks();
+        if (numReduceTasks > 1 || numReduceTasks < 0) {
+          // we only allow 0 or 1 reducer in local mode
+          numReduceTasks = 1;
+          job.setNumReduceTasks(1);
+        }
+        outputCommitter.setupJob(jContext);
+        status.setSetupProgress(1.0f);
+
+        Map<TaskAttemptID, TezTaskOutput> mapOutputFiles =
+            Collections.synchronizedMap(new HashMap<TaskAttemptID, TezTaskOutput>());
+
+        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
+            jobId, mapOutputFiles);
+        ExecutorService mapService = createMapExecutor(taskRunnables.size());
+
+        // Start populating the executor with work units.
+        // They may begin running immediately (in other threads).
+        for (Runnable r : taskRunnables) {
+          mapService.submit(r);
+        }
+
+        try {
+          mapService.shutdown(); // Instructs queue to drain.
+
+          // Wait for tasks to finish; do not use a time-based timeout.
+          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+          LOG.info("Waiting for map tasks");
+          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException ie) {
+          // Cancel all threads.
+          mapService.shutdownNow();
+          throw ie;
+        }
+
+        LOG.info("Map task executor complete.");
+
+        // After waiting for the map tasks to complete, if any of these
+        // have thrown an exception, rethrow it now in the main thread context.
+        for (MapTaskRunnable r : taskRunnables) {
+          if (r.storedException != null) {
+            throw new Exception(r.storedException);
+          }
+        }
+
+        TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+            jobId, TaskType.REDUCE, 0), 0);
+        LOG.info("Starting task: " + reduceId);
+        try {
+          if (numReduceTasks > 0) {
+            String user = 
+                UserGroupInformation.getCurrentUser().getShortUserName();
+            JobConf localConf = new JobConf(job);
+            localConf.setUser(user);
+            localConf.set("mapreduce.jobtracker.address", "local");
+            localConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, mapIds.size());
+            setupChildMapredLocalDirs(reduceId, user, localConf);
+            
+            
+            
+            TezEngineTask taskContext = new TezEngineTask(
+                IDConverter.fromMRTaskAttemptId(reduceId), user,
+                localConf.getJobName(), "TODO_vertexName", LocalFinalTask.class.getName(),
+                Collections.singletonList(new InputSpec("TODO_srcVertexName",
+                    mapIds.size())), null); 
+            Injector injector = Guice.createInjector(new LocalFinalTask());
+
+            // move map output to reduce input  
+            for (int i = 0; i < mapIds.size(); i++) {
+              if (!this.isInterrupted()) {
+                TaskAttemptID mapId = mapIds.get(i);
+                LOG.info("XXX mapId: " + i + 
+                    " LOCAL_DIR = " + 
+                    mapOutputFiles.get(mapId).getConf().get(
+                        TezJobConfig.LOCAL_DIR));
+                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+                TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
+                localOutputFile.setConf(localConf);
+                Path reduceIn =
+                  localOutputFile.getInputFileForWrite(
+                      IDConverter.fromMRTaskId(mapId.getTaskID()),
+                        localFs.getFileStatus(mapOut).getLen());
+                if (!localFs.mkdirs(reduceIn.getParent())) {
+                  throw new IOException("Mkdirs failed to create "
+                      + reduceIn.getParent().toString());
+                }
+                if (!localFs.rename(mapOut, reduceIn))
+                  throw new IOException("Couldn't rename " + mapOut);
+              } else {
+                throw new InterruptedException();
+              }
+            }
+            if (!this.isInterrupted()) {
+              reduce_tasks += 1;
+              myMetrics.launchReduce(reduceId);
+              TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+              Task t = factory.createTask(taskContext);
+              t.initialize(localConf, Job.this);
+              t.run();
+              myMetrics.completeReduce(reduceId);
+              reduce_tasks -= 1;
+            } else {
+              throw new InterruptedException();
+            }
+          }
+        } finally {
+          for (TezTaskOutput output : mapOutputFiles.values()) {
+            output.removeAll();
+          }
+        }
+        // delete the temporary directory in output directory
+        // FIXME
+        //outputCommitter.commitJob(jContext);
+        status.setCleanupProgress(1.0f);
+
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.SUCCEEDED);
+        }
+
+        JobEndNotifier.localRunnerNotification(job, status);
+
+      } catch (Throwable t) {
+        try {
+          outputCommitter.abortJob(jContext, 
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+        } catch (IOException ioe) {
+          LOG.info("Error cleaning up job:" + id);
+        }
+        status.setCleanupProgress(1.0f);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.FAILED);
+        }
+        LOG.warn(id, t);
+
+        JobEndNotifier.localRunnerNotification(job, status);
+
+      } finally {
+        try {
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          localDistributedCacheManager.close();
+        } catch (IOException e) {
+          LOG.warn("Error cleaning up "+id+": "+e);
+        }
+      }
+    }
+
+    // TaskUmbilicalProtocol methods
+    @Override
+    public ContainerTask getTask(ContainerContext containerContext)
+        throws IOException {
+      return null;
+    }
+    
+    @Override
+    public synchronized boolean statusUpdate(TezTaskAttemptID taskId,
+        TezTaskStatus taskStatus) throws IOException, InterruptedException {
+      LOG.info(taskStatus.getStateString());
+      int taskIndex = mapIds.indexOf(taskId);
+      if (taskIndex >= 0) {                       // mapping
+        float numTasks = (float) this.numMapTasks;
+
+        partialMapProgress[taskIndex] = taskStatus.getProgress();
+        mapCounters[taskIndex] = taskStatus.getCounters();
+
+        float partialProgress = 0.0f;
+        for (float f : partialMapProgress) {
+          partialProgress += f;
+        }
+        status.setMapProgress(partialProgress / numTasks);
+      } else {
+        reduceCounters = taskStatus.getCounters();
+        status.setReduceProgress(taskStatus.getProgress());
+      }
+
+      // ignore phase
+      return true;
+    }
+
+    /** Return the current values of the counters for this job,
+     * including tasks that are in progress.
+     */
+    public synchronized TezCounters getCurrentCounters() {
+      if (null == mapCounters) {
+        // Counters not yet initialized for job.
+        return EMPTY_COUNTERS;
+      }
+
+      TezCounters current = EMPTY_COUNTERS;
+      for (TezCounters c : mapCounters) {
+        current.incrAllCounters(c);
+      }
+      current.incrAllCounters(reduceCounters);
+      return current;
+    }
+
+    /**
+     * Task is reporting that it is in commit_pending
+     * and it is waiting for the commit Response
+     */
+    @Override
+    public void commitPending(TezTaskAttemptID taskid,
+                              TezTaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskid, taskStatus);
+    }
+
+    @Override
+    public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) {
+      // Ignore for now
+    }
+    
+    @Override
+    public boolean ping(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
+    @Override
+    public void done(TezTaskAttemptID taskId) throws IOException {
+      int taskIndex = mapIds.indexOf(taskId);
+      if (taskIndex >= 0) {                       // mapping
+        status.setMapProgress(1.0f);
+      } else {
+        status.setReduceProgress(1.0f);
+      }
+    }
+
+    @Override
+    public synchronized void fsError(TezTaskAttemptID taskId, String message) 
+    throws IOException {
+      LOG.fatal("FSError: "+ message + "from task: " + taskId);
+    }
+
+    @Override
+    public void shuffleError(TezTaskAttemptID taskId, String message) 
+        throws IOException {
+      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+    }
+    
+    @Override
+    public synchronized void fatalError(TezTaskAttemptID taskId, String msg) 
+    throws IOException {
+      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+    }
+    
+    @Override
+    public TezTaskDependencyCompletionEventsUpdate 
+    getDependentTasksCompletionEvents(
+        int fromEventIdx, int maxEventsToFetch,
+        TezTaskAttemptID reduce) {
+      throw new UnsupportedOperationException(
+          "getDependentTasksCompletionEvents not supported in LocalJobRunner");
+    }
+
+    @Override
+    public void outputReady(TezTaskAttemptID taskAttemptId,
+        OutputContext outputContext) throws IOException {
+      // Ignore for now.
+    }
+
+    @Override
+    public ProceedToCompletionResponse proceedToCompletion(
+        TezTaskAttemptID taskAttemptId) throws IOException {
+      // TODO TEZAM5 Really depends on the module - inmem shuffle or not.
+      return new ProceedToCompletionResponse(true, true);
+    }
+  }
+
+  public LocalJobRunner(Configuration conf) throws IOException {
+    this(new JobConf(conf));
+  }
+
+  @Deprecated
+  public LocalJobRunner(JobConf conf) throws IOException {
+    this.fs = FileSystem.getLocal(conf);
+    this.conf = conf;
+    myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
+  }
+
+  // JobSubmissionProtocol methods
+
+  private static int jobid = 0;
+  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
+    return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
+  }
+
+  public org.apache.hadoop.mapreduce.JobStatus submitJob(
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
+      Credentials credentials) throws IOException {
+    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
+    job.job.setCredentials(credentials);
+    return job.status;
+
+  }
+
+  public void killJob(org.apache.hadoop.mapreduce.JobID id) {
+    jobs.get(JobID.downgrade(id)).killed = true;
+    jobs.get(JobID.downgrade(id)).interrupt();
+  }
+
+  public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
+      String jp) throws IOException {
+    throw new UnsupportedOperationException("Changing job priority " +
+                      "in LocalJobRunner is not supported.");
+  }
+  
+  /** Throws {@link UnsupportedOperationException} */
+  public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+      boolean shouldFail) throws IOException {
+    throw new UnsupportedOperationException("Killing tasks in " +
+    "LocalJobRunner is not supported");
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+      org.apache.hadoop.mapreduce.JobID id, TaskType type) {
+    return new org.apache.hadoop.mapreduce.TaskReport[0];
+  }
+
+  public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
+    if(job != null)
+      return job.status;
+    else 
+      return null;
+  }
+  
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
+
+    return new org.apache.hadoop.mapreduce.Counters(
+        new MRCounters(job.getCurrentCounters()));
+  }
+
+  public String getFilesystemName() throws IOException {
+    return fs.getUri().toString();
+  }
+  
+  public ClusterMetrics getClusterMetrics() {
+    int numMapTasks = map_tasks.get();
+    return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
+        reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+  }
+
+  public JobTrackerStatus getJobTrackerStatus() {
+    return JobTrackerStatus.RUNNING;
+  }
+
+  public long getTaskTrackerExpiryInterval() 
+      throws IOException, InterruptedException {
+    return 0;
+  }
+
+  /** 
+   * Get all active trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getActiveTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  /** 
+   * Get all blacklisted trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getBlacklistedTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+      org.apache.hadoop.mapreduce.JobID jobid
+      , int fromEventId, int maxEvents) throws IOException {
+    return TaskCompletionEvent.EMPTY_ARRAY;
+  }
+  
+  public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
+
+  
+  /**
+   * Returns the diagnostic information for a particular task in the given job.
+   * To be implemented
+   */
+  public String[] getTaskDiagnostics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
+	  return new String [0];
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
+   */
+  public String getSystemDir() {
+    Path sysDir = new Path(
+      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  return new AccessControlList(" ");// no queue admins for local job runner
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() throws IOException {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        "/tmp/hadoop/mapred/staging"));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String user;
+    if (ugi != null) {
+      user = ugi.getShortUserName() + rand.nextInt();
+    } else {
+      user = "dummy" + rand.nextInt();
+    }
+    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+  }
+  
+  public String getJobHistoryDir() {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String queueName) throws IOException {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException {
+    return null;
+  }
+
+
+  @Override
+  public QueueInfo getQueue(String queue) throws IOException {
+    return null;
+  }
+
+  @Override
+  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
+      getQueueAclsForCurrentUser() throws IOException{
+    return null;
+  }
+
+  /**
+   * Set the max number of map tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxMaps the maximum number of map tasks to allow.
+   */
+  public static void setLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxMaps) {
+    job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
+  }
+
+  /**
+   * @return the max number of map tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
+  }
+
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
+                                       ) throws IOException,
+                                                InterruptedException {
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> 
+     getDelegationToken(Text renewer) throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
+                                      ) throws IOException,InterruptedException{
+    return 0;
+  }
+
+  @Override
+  public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+  
+  static void setupChildMapredLocalDirs(
+      TaskAttemptID taskAttemptID, String user, JobConf conf) {
+    String[] localDirs = 
+        conf.getTrimmedStrings(
+            TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR);
+    String jobId = taskAttemptID.getJobID().toString();
+    String taskId = taskAttemptID.getTaskID().toString();
+    boolean isCleanup = false;
+    StringBuffer childMapredLocalDir =
+        new StringBuffer(localDirs[0] + Path.SEPARATOR
+            + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    for (int i = 1; i < localDirs.length; i++) {
+      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+          + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    }
+    LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID + 
+        " is " + childMapredLocalDir);
+    conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString());
+    conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
+        TezLocalTaskOutputFiles.class, TezTaskOutput.class);
+  }
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+  static final String SUBDIR = jobDir;
+  static final String JOBCACHE = "jobcache";
+  
+  static String getLocalTaskDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+    }
+    return taskDir;
+  }
+  
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+@SuppressWarnings("deprecation")
+class LocalJobRunnerMetrics implements Updater {
+  private final MetricsRecord metricsRecord;
+
+  private int numMapTasksLaunched = 0;
+  private int numMapTasksCompleted = 0;
+  private int numReduceTasksLaunched = 0;
+  private int numReduceTasksCompleted = 0;
+  private int numWaitingMaps = 0;
+  private int numWaitingReduces = 0;
+  
+  public LocalJobRunnerMetrics(JobConf conf) {
+    String sessionId = conf.getSessionId();
+    // Initiate JVM Metrics
+    JvmMetrics.init("JobTracker", sessionId);
+    // Create a record for map-reduce metrics
+    MetricsContext context = MetricsUtil.getContext("mapred");
+    // record name is jobtracker for compatibility 
+    metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+    metricsRecord.setTag("sessionId", sessionId);
+    context.registerUpdater(this);
+  }
+    
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+      metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+      metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
+      metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+      numMapTasksLaunched = 0;
+      numMapTasksCompleted = 0;
+      numReduceTasksLaunched = 0;
+      numReduceTasksCompleted = 0;
+      numWaitingMaps = 0;
+      numWaitingReduces = 0;
+    }
+    metricsRecord.update();
+  }
+
+  public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksLaunched;
+    decWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksCompleted;
+  }
+
+  public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksLaunched;
+    decWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksCompleted;
+  }
+
+  private synchronized void decWaitingMaps(JobID id, int task) {
+    numWaitingMaps -= task;
+  }
+  
+  private synchronized void decWaitingReduces(JobID id, int task){
+    numWaitingReduces -= task;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * A utility that reads the split meta info and creates split meta info objects
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SplitMetaInfoReaderTez {
+
+  public static final Log LOG = LogFactory.getLog(SplitMetaInfoReaderTez.class);
+
+  // Forked from the MR variant so that the metaInfo file as well as the split
+  // file can be read from local fs - relying on these files being localized.
+  public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
+      FileSystem fs) throws IOException {
+
+    long maxMetaInfoSize = conf.getLong(
+        MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+        MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
+
+    Path metaSplitFile = new Path(MRJobConfig.JOB_SPLIT_METAINFO);
+    String jobSplitFile = MRJobConfig.JOB_SPLIT;
+
+    File file = new File(metaSplitFile.toUri().getPath()).getAbsoluteFile();
+    LOG.info("DEBUG: Setting up JobSplitIndex with JobSplitFile at: "
+        + file.getAbsolutePath() + ", defaultFS from conf: "
+        + FileSystem.getDefaultUri(conf));
+    
+    FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+    if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+      throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+          + ". Aborting job ");
+    }
+    FSDataInputStream in = fs.open(metaSplitFile);
+    byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != JobSplit.META_SPLIT_VERSION) {
+      in.close();
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
+    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+      splitMetaInfo.readFields(in);
+      JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+          jobSplitFile, splitMetaInfo.getStartOffset());
+      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+          splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
+    }
+    in.close();
+    return allSplitMetaInfo;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.records.TezContainerId;
+
+// TODO EVENTUALLY move this over to PB. Fix package/module.
+// TODO EVENTUALLY unit tests for functionality.
+public class ContainerContext implements Writable {
+
+  ContainerId containerId;
+  String pid;
+
+  public ContainerContext() {
+    containerId = Records.newRecord(ContainerId.class);
+    pid = "";
+  }
+
+  public ContainerContext(ContainerId containerId, String pid) {
+    this.containerId = containerId;
+    this.pid = pid;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public String getPid() {
+    return pid;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    TezContainerId tezContainerId = new TezContainerId();
+    tezContainerId.readFields(in);
+    this.containerId = tezContainerId.getContainerId();
+    this.pid = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TezContainerId tezContainerId = new TezContainerId(containerId);
+    tezContainerId.write(out);
+    Text.writeString(out, pid);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+
+public class DeprecatedKeys {
+  static {
+    addDeprecatedKeys();
+  }
+
+  // TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.
+  // Maybe define in an TEZConfiguration / TEZ JobConf variant.
+  
+  public static void init() {
+  }
+  
+  private static void addDeprecatedKeys() {
+    
+    _(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+
+    _(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+    
+    _(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+    
+    _(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
+        
+    _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+
+    _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
+    
+    _(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+    
+    _(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+    
+    _(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+    
+    _(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    
+    _(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+    
+    _(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+    
+    _(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+    
+    _(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+    
+    _(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+    
+    _(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+    
+    _(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+    
+    _(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+    
+    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+    
+    _(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    
+    _(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+    
+    _(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+    
+    _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+    
+    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+  }
+  
+  private static void _(String oldKey, String newKey) {
+    Configuration.addDeprecation(oldKey, newKey);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class IDConverter {
+
+  // FIXME hardcoded assumption that one app is one dag
+  public static JobID toMRJobId(TezDAGID dagId) {
+    return new JobID(
+        Long.toString(dagId.getApplicationId().getClusterTimestamp()),
+        dagId.getApplicationId().getId());
+  }
+  
+  public static TaskID toMRTaskId(TezTaskID taskid) {
+    return new TaskID(
+        toMRJobId(taskid.getVertexID().getDAGId()),
+        taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
+        taskid.getId());
+  }
+
+  public static TaskAttemptID toMRTaskAttemptId(
+      TezTaskAttemptID taskAttemptId) {
+    return new TaskAttemptID(
+        toMRTaskId(taskAttemptId.getTaskID()),
+        taskAttemptId.getId());
+  }
+  
+  // FIXME hardcoded assumption that one app is one dag
+  public static TezDAGID fromMRJobId(
+      org.apache.hadoop.mapreduce.JobID jobId) {
+    return new TezDAGID(BuilderUtils.newApplicationId(
+        Long.valueOf(jobId.getJtIdentifier()), jobId.getId()), 1);
+  }
+
+  public static MRTaskType fromMRTaskType(TaskType type) {
+    switch (type) {
+      case REDUCE:
+        return MRTaskType.REDUCE;
+      case JOB_SETUP:
+        return MRTaskType.JOB_SETUP;
+      case JOB_CLEANUP:
+        return MRTaskType.JOB_CLEANUP;
+      case TASK_CLEANUP:
+        return MRTaskType.TASK_CLEANUP;
+      case MAP:
+        return MRTaskType.MAP;
+      default:
+        throw new RuntimeException("Unknown TaskType: " + type);
+    }
+  }
+
+  // FIXME hack alert converting objects with hard coded id
+  public static TezTaskID
+      fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
+    return new TezTaskID(
+        new TezVertexID(fromMRJobId(taskid.getJobID()),
+                (taskid.getTaskType() == TaskType.MAP ? 0 : 1)
+            ),
+        taskid.getId());
+  }
+
+  public static TezTaskAttemptID fromMRTaskAttemptId(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptId) {
+    return new TezTaskAttemptID(
+        fromMRTaskId(taskAttemptId.getTaskID()),
+        taskAttemptId.getId());
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Place holder for cluster level configuration keys.
+ * 
+ * The keys should have "mapreduce.cluster." as the prefix. 
+ *
+ */
+@InterfaceAudience.Private
+public interface MRConfig {
+
+  // Cluster-level configuration parameters
+  public static final String TEMP_DIR = "mapreduce.cluster.temp.dir";
+  public static final String LOCAL_DIR = "mapreduce.cluster.local.dir";
+  public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
+  public static final String REDUCEMEMORY_MB = 
+    "mapreduce.cluster.reducememory.mb";
+  public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
+  public static final String MR_ADMINS =
+    "mapreduce.cluster.administrators";
+  @Deprecated
+  public static final String MR_SUPERGROUP =
+    "mapreduce.cluster.permissions.supergroup";
+
+  //Delegation token related keys
+  public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY = 
+    "mapreduce.cluster.delegation.key.update-interval";
+  public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = 
+    24*60*60*1000; // 1 day
+  public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY = 
+    "mapreduce.cluster.delegation.token.renew-interval";
+  public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 
+    24*60*60*1000;  // 1 day
+  public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY = 
+    "mapreduce.cluster.delegation.token.max-lifetime";
+  public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 
+    7*24*60*60*1000; // 7 days
+  
+  public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
+    "mapreduce.job.process-tree.class";
+  public static final String STATIC_RESOLUTIONS = 
+    "mapreduce.job.net.static.resolutions";
+
+  public static final String MASTER_ADDRESS  = "mapreduce.jobtracker.address";
+  public static final String MASTER_USER_NAME = 
+    "mapreduce.jobtracker.kerberos.principal";
+
+  public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
+  public static final String CLASSIC_FRAMEWORK_NAME  = "classic";
+  public static final String YARN_TEZ_FRAMEWORK_NAME  = "yarn-tez";
+  public static final String LOCAL_FRAMEWORK_NAME = "local";
+
+  public static final String TASK_LOCAL_OUTPUT_CLASS =
+  "mapreduce.task.local.output.class";
+
+  public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
+    "mapreduce.task.max.status.length";
+  public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
+
+  public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
+  public static final String MAX_BLOCK_LOCATIONS_KEY =
+    "mapreduce.job.max.split.locations";
+
+  public static final String SHUFFLE_SSL_ENABLED_KEY =
+    "mapreduce.shuffle.ssl.enabled";
+
+  public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native