You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [32/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,676 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MRJobConfig {
+
+  // Put all of the attribute names in here so that Job and JobContext are
+  // consistent.
+  public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";
+
+  public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+
+  public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
+
+  public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
+
+  public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class";
+
+  public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class";
+
+  public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";
+
+  public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+
+  public static final String JAR = "mapreduce.job.jar";
+
+  public static final String ID = "mapreduce.job.id";
+
+  public static final String JOB_NAME = "mapreduce.job.name";
+
+  public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
+
+  public static final String USER_NAME = "mapreduce.job.user.name";
+
+  public static final String PRIORITY = "mapreduce.job.priority";
+
+  public static final String QUEUE_NAME = "mapreduce.job.queuename";
+
+  public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
+
+  public static final String SPLIT_FILE = "mapreduce.job.splitfile";
+  
+  public static final String SPLIT_METAINFO_MAXSIZE = "mapreduce.job.split.metainfo.maxsize";
+  public static final long DEFAULT_SPLIT_METAINFO_MAXSIZE = 10000000L;
+
+  public static final String NUM_MAPS = "mapreduce.job.maps";
+
+  public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";
+
+  public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART = "mapreduce.job.reduce.slowstart.completedmaps";
+
+  public static final String NUM_REDUCES = "mapreduce.job.reduces";
+
+  public static final String SKIP_RECORDS = "mapreduce.job.skiprecords";
+
+  public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+
+  public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
+
+  public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
+
+  public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
+
+  public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
+
+  public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
+
+  public static final String OUTPUT_VALUE_CLASS = "mapreduce.job.output.value.class";
+
+  public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+
+  public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
+
+  public static final String WORKING_DIR = "mapreduce.job.working.dir";
+
+  public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives";
+
+  public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
+
+  public static final String CACHE_FILES = "mapreduce.job.cache.files";
+
+  public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+
+  public static final String CACHE_FILES_SIZES = "mapreduce.job.cache.files.filesizes"; // internal use only
+
+  public static final String CACHE_ARCHIVES_SIZES = "mapreduce.job.cache.archives.filesizes"; // ditto
+
+  public static final String CACHE_LOCALFILES = "mapreduce.job.cache.local.files";
+
+  public static final String CACHE_LOCALARCHIVES = "mapreduce.job.cache.local.archives";
+
+  public static final String CACHE_FILE_TIMESTAMPS = "mapreduce.job.cache.files.timestamps";
+
+  public static final String CACHE_ARCHIVES_TIMESTAMPS = "mapreduce.job.cache.archives.timestamps";
+
+  public static final String CACHE_FILE_VISIBILITIES = "mapreduce.job.cache.files.visibilities";
+
+  public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
+
+  /**
+   * @deprecated Symlinks are always on and cannot be disabled.
+   */
+  @Deprecated
+  public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
+
+  public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
+
+  public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
+
+  public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
+
+  public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+
+  public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
+  public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
+
+  public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";
+
+  public static final String TASK_TEMP_DIR = "mapreduce.task.tmp.dir";
+
+  public static final String TASK_DEBUGOUT_LINES = "mapreduce.task.debugout.lines";
+
+  public static final String RECORDS_BEFORE_PROGRESS = "mapreduce.task.merge.progress.records";
+
+  public static final String SKIP_START_ATTEMPTS = "mapreduce.task.skip.start.attempts";
+
+  public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+
+  public static final String TASK_ISMAP = "mapreduce.task.ismap";
+
+  public static final String TASK_PARTITION = "mapreduce.task.partition";
+
+  public static final String TASK_PROFILE = "mapreduce.task.profile";
+
+  public static final String TASK_PROFILE_PARAMS = "mapreduce.task.profile.params";
+
+  public static final String NUM_MAP_PROFILES = "mapreduce.task.profile.maps";
+
+  public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
+
+  public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
+  
+  public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
+  
+  public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+
+  public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
+  
+  public static final String TASK_ID = "mapreduce.task.id";
+
+  public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
+
+  public static final String TASK_USERLOG_LIMIT = "mapreduce.task.userlog.limit.kb";
+
+  public static final String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";
+
+  public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";
+
+  public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
+
+  public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+
+  public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+  public static final int DEFAULT_MAP_MEMORY_MB = 1024;
+
+  public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
+  public static final int DEFAULT_MAP_CPU_VCORES = 1;
+
+  public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb";
+
+  public static final String MAP_ENV = "mapreduce.map.env";
+
+  public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+
+  public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts";
+
+  public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script";
+
+  public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
+
+  public static final String MAP_FAILURES_MAX_PERCENT = "mapreduce.map.failures.maxpercent";
+
+  public static final String MAP_SKIP_INCR_PROC_COUNT = "mapreduce.map.skip.proc-count.auto-incr";
+
+  public static final String MAP_SKIP_MAX_RECORDS = "mapreduce.map.skip.maxrecords";
+
+  public static final String MAP_COMBINE_MIN_SPILLS = "mapreduce.map.combine.minspills";
+
+  public static final String MAP_OUTPUT_COMPRESS = "mapreduce.map.output.compress";
+
+  public static final String MAP_OUTPUT_COMPRESS_CODEC = "mapreduce.map.output.compress.codec";
+
+  public static final String MAP_OUTPUT_KEY_CLASS = "mapreduce.map.output.key.class";
+
+  public static final String MAP_OUTPUT_VALUE_CLASS = "mapreduce.map.output.value.class";
+
+  public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = "mapreduce.map.output.key.field.separator";
+
+  public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
+
+  public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
+
+  public static final String DEFAULT_LOG_LEVEL = "INFO";
+
+  public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
+
+  public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
+
+  public static final String REDUCE_MARKRESET_BUFFER_PERCENT = "mapreduce.reduce.markreset.buffer.percent";
+
+  public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
+
+  public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb";
+
+  public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+  public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
+
+  public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
+  public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
+
+  public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
+
+  public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
+
+  public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
+    = "mapreduce.reduce.shuffle.memory.limit.percent";
+
+  public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
+
+  public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";
+
+  public static final String REDUCE_ENV = "mapreduce.reduce.env";
+
+  public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
+
+  public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
+
+  public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
+
+  public static final String SHUFFLE_PARALLEL_COPIES = "mapreduce.reduce.shuffle.parallelcopies";
+
+  public static final String REDUCE_DEBUG_SCRIPT = "mapreduce.reduce.debug.script";
+
+  public static final String REDUCE_SPECULATIVE = "mapreduce.reduce.speculative";
+
+  public static final String SHUFFLE_CONNECT_TIMEOUT = "mapreduce.reduce.shuffle.connect.timeout";
+
+  public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
+
+  public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+
+  public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+
+  public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
+
+  public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
+
+  public static final String REDUCE_MEMTOMEM_THRESHOLD = "mapreduce.reduce.merge.memtomem.threshold";
+
+  public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
+
+  public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
+
+  public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+
+  public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
+
+  public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens";
+
+  public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
+
+  public static final String DEFAULT_JOB_ACL_VIEW_JOB = " ";
+
+  public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
+
+  public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
+  
+  /* config for tracking the local file where all the credentials for the job
+   * credentials.
+   */
+  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = 
+      "mapreduce.job.credentials.binary";
+
+  public static final String JOB_SUBMITHOST =
+    "mapreduce.job.submithostname";
+  public static final String JOB_SUBMITHOSTADDR =
+    "mapreduce.job.submithostaddress";
+
+  public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+  public static final String JOB_UBERTASK_ENABLE =
+    "mapreduce.job.ubertask.enable";
+  public static final String JOB_UBERTASK_MAXMAPS =
+    "mapreduce.job.ubertask.maxmaps";
+  public static final String JOB_UBERTASK_MAXREDUCES =
+    "mapreduce.job.ubertask.maxreduces";
+  public static final String JOB_UBERTASK_MAXBYTES =
+    "mapreduce.job.ubertask.maxbytes";
+
+  public static final String MR_PREFIX = "yarn.app.mapreduce.";
+
+  public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
+
+  /** The number of client retires to the AM - before reconnecting to the RM
+   * to fetch Application State. 
+   */
+  public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 
+    MR_PREFIX + "client-am.ipc.max-retries";
+  public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
+  
+  /**
+   * The number of client retries to the RM/HS/AM before throwing exception.
+   */
+  public static final String MR_CLIENT_MAX_RETRIES = 
+    MR_PREFIX + "client.max-retries";
+  public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+  
+  /** The staging directory for map reduce.*/
+  public static final String MR_AM_STAGING_DIR = 
+    MR_AM_PREFIX+"staging-dir";
+  public static final String DEFAULT_MR_AM_STAGING_DIR = 
+    "/tmp/hadoop-yarn/staging";
+
+  /** The amount of memory the MR app master needs.*/
+  public static final String MR_AM_VMEM_MB =
+    MR_AM_PREFIX+"resource.mb";
+  public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
+
+  /** The number of virtual cores the MR app master needs.*/
+  public static final String MR_AM_CPU_VCORES =
+    MR_AM_PREFIX+"resource.cpu-vcores";
+  public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
+
+  /** Command line arguments passed to the MR app master.*/
+  public static final String MR_AM_COMMAND_OPTS =
+    MR_AM_PREFIX+"command-opts";
+  public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1024m";
+
+  /** Admin command opts passed to the MR app master.*/
+  public static final String MR_AM_ADMIN_COMMAND_OPTS =
+      MR_AM_PREFIX+"admin-command-opts";
+  public static final String DEFAULT_MR_AM_ADMIN_COMMAND_OPTS = "";
+
+  /** Root Logging level passed to the MR app master.*/
+  public static final String MR_AM_LOG_LEVEL = 
+    MR_AM_PREFIX+"log.level";
+  public static final String DEFAULT_MR_AM_LOG_LEVEL = "INFO";
+
+  /**The number of splits when reporting progress in MR*/
+  public static final String MR_AM_NUM_PROGRESS_SPLITS = 
+    MR_AM_PREFIX+"num-progress-splits";
+  public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
+
+  /**
+   * Upper limit on the number of threads user to launch containers in the app
+   * master. Expect level config, you shouldn't be needing it in most cases.
+   */
+  public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+    MR_AM_PREFIX+"containerlauncher.thread-count-limit";
+
+  public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = 
+      500;
+
+  /** Number of threads to handle job client RPC requests.*/
+  public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
+    MR_AM_PREFIX + "job.client.thread-count";
+  public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
+
+  /** 
+   * Range of ports that the MapReduce AM can use when binding. Leave blank
+   * if you want all possible ports.
+   */
+  public static final String MR_AM_JOB_CLIENT_PORT_RANGE = 
+    MR_AM_PREFIX + "job.client.port-range";
+  
+  /** Enable blacklisting of nodes in the job.*/
+  public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = 
+    MR_AM_PREFIX  + "job.node-blacklisting.enable";
+
+  /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
+  public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
+      MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
+  public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
+      33;
+  
+  /** Enable job recovery.*/
+  public static final String MR_AM_JOB_RECOVERY_ENABLE = 
+    MR_AM_PREFIX + "job.recovery.enable";
+
+  /** 
+   * Limit on the number of reducers that can be preempted to ensure that at
+   *  least one map task can run if it needs to. Percentage between 0.0 and 1.0
+   */
+  public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
+    MR_AM_PREFIX  + "job.reduce.preemption.limit";
+  public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
+  
+  /** AM ACL disabled. **/
+  public static final String JOB_AM_ACCESS_DISABLED = 
+    "mapreduce.job.am-access-disabled";
+  public static final boolean DEFAULT_JOB_AM_ACCESS_DISABLED = false;
+
+  /**
+   * Limit reduces starting until a certain percentage of maps have finished.
+   *  Percentage between 0.0 and 1.0
+   */
+  public static final String MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT = 
+    MR_AM_PREFIX  + "job.reduce.rampup.limit";
+  public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
+
+  /** The class that should be used for speculative execution calculations.*/
+  public static final String MR_AM_JOB_SPECULATOR =
+    MR_AM_PREFIX + "job.speculator.class";
+
+  /** Class used to estimate task resource needs.*/
+  public static final String MR_AM_TASK_ESTIMATOR =
+    MR_AM_PREFIX + "job.task.estimator.class";
+
+  /** The lambda value in the smoothing function of the task estimator.*/
+  public static final String MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
+    MR_AM_PREFIX
+    + "job.task.estimator.exponential.smooth.lambda-ms";
+
+  public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS = 
+  1000L * 60;
+
+  /** true if the smoothing rate should be exponential.*/
+  public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
+    MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
+
+  /** The number of threads used to handle task RPC calls.*/
+  public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
+    MR_AM_PREFIX + "job.task.listener.thread-count";
+  public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30;
+
+  /** How often the AM should schedule assigning tasks with allocated
+   * containers.*/
+  public static final String MR_AM_SCHEDULER_INTERVAL =
+    MR_AM_PREFIX + "scheduler.interval-ms";
+  public static final long DEFAULT_MR_AM_SCHEDULER_INTERVAL = 1000l;
+
+  /** How often the AM should send heartbeats to the RM.*/
+  public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
+
+  /**
+   * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+   * milliseconds before aborting. During this interval, AM will still try
+   * to contact the RM.
+   */
+  public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+  /**
+   * Boolean. Create the base dirs in the JobHistoryEventHandler
+   * Set to false for multi-user clusters.  This is an internal config that
+   * is set by the MR framework and read by it too.
+   */
+  public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR = 
+    MR_AM_PREFIX + "create-intermediate-jh-base-dir";
+  
+  public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      MR_AM_PREFIX + "history.max-unflushed-events";
+  public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      200;
+
+  public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
+  public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      30;
+
+  public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      MR_AM_PREFIX + "history.complete-event-flush-timeout";
+  public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      30 * 1000l;
+
+  public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
+  public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      50;
+  
+  public static final String MR_AM_ENV =
+      MR_AM_PREFIX + "env";
+  
+  public static final String MR_AM_ADMIN_USER_ENV =
+      MR_AM_PREFIX + "admin.user.env";
+
+  public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
+      "mapreduce.admin.map.child.java.opts";
+
+  public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
+      "mapreduce.admin.reduce.child.java.opts";
+
+  public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
+      "-Djava.net.preferIPv4Stack=true " +
+          "-Dhadoop.metrics.log.level=WARN ";
+
+  public static final String MAPRED_ADMIN_USER_SHELL =
+      "mapreduce.admin.user.shell";
+
+  public static final String DEFAULT_SHELL = "/bin/bash";
+
+  public static final String MAPRED_ADMIN_USER_ENV =
+      "mapreduce.admin.user.env";
+
+  public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
+      "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native";
+
+  public static final String WORKDIR = "work";
+
+  public static final String OUTPUT = "output";
+
+  public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+
+  // Environment variables used by Pipes. (TODO: these
+  // do not appear to be used by current pipes source code!)
+  public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
+  public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
+
+  public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
+  // This should be the directory where splits file gets localized on the node
+  // running ApplicationMaster.
+  public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
+
+  // This should be the name of the localized job-configuration file on the node
+  // running ApplicationMaster and Task
+  public static final String JOB_CONF_FILE = "job.xml";
+
+  // This should be the name of the localized job-jar file on the node running
+  // individual containers/tasks.
+  public static final String JOB_JAR = "job.jar";
+
+  public static final String JOB_SPLIT = "job.split";
+
+  public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
+
+  public static final String APPLICATION_MASTER_CLASS =
+      "org.apache.tez.dag.app.DAGAppMaster";
+
+  // The token file for the application. Should contain tokens for access to
+  // remote file system and may optionally contain application specific tokens.
+  // For now, generated by the AppManagers and used by NodeManagers and the
+  // Containers.
+  public static final String APPLICATION_TOKENS_FILE = "appTokens";
+  
+  /** The log directory for the containers */
+  public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
+  
+  public static final String TASK_LOG_SIZE = MR_PREFIX + "container.log.filesize";
+  
+  public static final String MAPREDUCE_V2_CHILD_CLASS = 
+      "org.apache.hadoop.mapred.YarnChild";
+
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
+
+  /**
+   * Job end notification.
+   */
+  public static final String MR_JOB_END_NOTIFICATION_URL =
+    "mapreduce.job.end-notification.url";
+
+  public static final String MR_JOB_END_NOTIFICATION_PROXY =
+    "mapreduce.job.end-notification.proxy";
+
+  public static final String MR_JOB_END_RETRY_ATTEMPTS =
+    "mapreduce.job.end-notification.retry.attempts";
+
+  public static final String MR_JOB_END_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.retry.interval";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
+    "mapreduce.job.end-notification.max.attempts";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.max.retry.interval";
+
+  /*
+   * MR AM Service Authorization
+   */
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL =
+      "security.job.task.protocol.acl";
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
+      "security.job.client.protocol.acl";
+
+  /**
+   * CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String MAPREDUCE_APPLICATION_CLASSPATH = 
+      "mapreduce.application.classpath";
+
+  /**
+   * Default CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = {
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
+  };
+
+
+  // TODO Fix this. Not accessible in JobClient
+  /* do we need a HS delegation token for this client */
+  @InterfaceAudience.Private
+  static final String HS_DELEGATION_TOKEN_REQUIRED
+      = "mapreduce.history.server.delegationtoken.required";
+
+  // MRR related config properties
+
+  public static final String MRR_INTERMEDIATE_STAGES =
+      "mrr.intermediate.num-stages";
+
+  public static final String MRR_INTERMEDIATE_STAGE_PREFIX =
+      "mrr.intermediate.stage.";
+
+  // Stage specific properties
+  // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
+  // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below.
+//  public static final String MRR_INTERMEDIATE_STAGE_TASKS = "tasks";
+//  public static final String MRR_INTERMEDIATE_STAGE_CLASS = "class";
+//  public static final String
+//      MRR_INTERMEDIATE_STAGE_PARTITIONER_CLASS = "partitioner.class";
+//  public static final String
+//      MRR_INTERMEDIATE_STAGE_COMBINER_CLASS = "combiner.class";
+//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESS =
+//      "output.compress";
+//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESSION_CODEC =
+//      "output.compression.codec";
+//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_CLASS =
+//      "key.class";
+//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_COMPARATOR_CLASS
+//    = "key.comparator.class";
+//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_VALUE_CLASS =
+//      "value.class";
+//  public static final String MRR_INTERMEDIATE_STAGE_SPECULATE =
+//      "speculate";
+//  public static final String MRR_INTERMEDIATE_STAGE_MEMORY_MB =
+//      "memory.mb";
+//  public static final String MRR_INTERMEDIATE_STAGE_CHILD_JAVA_OPTS =
+//      "child.java.opts";
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,300 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class MRTaskStatus implements TezTaskStatus {
+
+  static final Log LOG =
+      LogFactory.getLog(TaskStatus.class.getName());
+  // max task-status string size
+  static final int MAX_STRING_SIZE = 1024;
+
+  private TezTaskAttemptID taskAttemptId;
+  private State state = State.UNASSIGNED;
+  private float progress = 0.0f;
+  private String diagnostics = "";
+  private String userStatusInfo = "";
+  private Phase phase;
+  private TezCounters counters;
+  
+  private long localOutputSize;
+  List<TezTaskAttemptID> failedTaskDependencies = 
+      new ArrayList<TezTaskAttemptID>();
+  
+  private long startTime;
+  private long finishTime;
+  private long sortFinishTime;
+  private long mapFinishTime;
+  private long shuffleFinishTime;
+  
+  // For serialization.
+  public MRTaskStatus() {
+  }
+  
+  public MRTaskStatus(
+      TezTaskAttemptID taskAttemptId,  
+      TezCounters counters, Phase phase) {
+    this.taskAttemptId = taskAttemptId;
+    this.counters = counters;
+    this.phase = phase;
+  }
+  
+  @Override
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress; 
+  }
+
+  @Override
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  @Override
+  public State getRunState() {
+    return state;
+  }
+
+  @Override
+  public void setRunState(State state) {
+    this.state = state;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+
+  @Override
+  public void setDiagnosticInfo(String info) {
+    this.diagnostics = info;
+  }
+
+  @Override
+  public String getStateString() {
+    return userStatusInfo;
+  }
+
+  @Override
+  public void setStateString(String userStatusInfo) {
+    this.userStatusInfo = userStatusInfo;
+  }
+
+  @Override
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @Override
+  public long getShuffleFinishTime() {
+    return shuffleFinishTime;
+  }
+
+  @Override
+  public long getMapFinishTime() {
+    return mapFinishTime;
+  }
+
+  @Override
+  public long getSortFinishTime() {
+    return sortFinishTime;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public Phase getPhase() {
+    return phase;
+  }
+
+  @Override
+  public void setPhase(Phase phase) {
+    Phase oldPhase = getPhase();
+    if (oldPhase != phase) {
+      // sort phase started
+      if (phase == Phase.SORT){
+        if (oldPhase == Phase.MAP) {
+          setMapFinishTime(System.currentTimeMillis());
+        } else {
+          setShuffleFinishTime(System.currentTimeMillis());
+        }
+      } else if (phase == Phase.REDUCE) {
+        setSortFinishTime(System.currentTimeMillis());
+      }
+      this.phase = phase;
+    }
+  }
+
+  @Override
+  public TezCounters getCounters() {
+    return counters;
+  }
+
+  @Override
+  public void setCounters(TezCounters counters) {
+    this.counters = counters;
+  }
+
+  @Override
+  public long getLocalOutputSize() {
+    return localOutputSize;
+  }
+
+  @Override
+  public List<TezTaskAttemptID> getFailedDependencies() {
+    return failedTaskDependencies;
+  }
+
+  @Override
+  public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
+    failedTaskDependencies.add(taskAttemptId);
+  }
+
+  @Override
+  synchronized public void clearStatus() {
+    userStatusInfo = "";
+    failedTaskDependencies.clear();
+  }
+
+  @Override
+  synchronized public void statusUpdate(
+      float progress, String userDiagnosticInfo, TezCounters counters) {
+    setProgress(progress);
+    setDiagnosticInfo(userDiagnosticInfo);
+    setCounters(counters);
+  }
+
+  @Override
+  public void setOutputSize(long localOutputSize) {
+    this.localOutputSize = localOutputSize;
+  }
+
+  @Override
+  public void setFinishTime(long finishTime) {
+    if(this.getStartTime() > 0 && finishTime > 0) {
+      if (getShuffleFinishTime() == 0) {
+        setShuffleFinishTime(finishTime);
+      }
+      if (getSortFinishTime() == 0){
+        setSortFinishTime(finishTime);
+      }
+      if (getMapFinishTime() == 0) {
+        setMapFinishTime(finishTime);
+      }
+      this.finishTime = finishTime;
+    }
+  }
+
+  @Override
+  public void setShuffleFinishTime(long shuffleFinishTime) {
+    this.shuffleFinishTime = shuffleFinishTime;
+  }
+
+  @Override
+  public void setMapFinishTime(long mapFinishTime) {
+    this.mapFinishTime = mapFinishTime;
+  }
+
+  @Override
+  public void setSortFinishTime(long sortFinishTime) {
+    this.sortFinishTime = sortFinishTime;
+    if (getShuffleFinishTime() == this.shuffleFinishTime ){
+      setShuffleFinishTime(sortFinishTime);
+    }
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    WritableUtils.writeEnum(out, state);
+    out.writeFloat(progress);
+    WritableUtils.writeString(out, diagnostics);
+    WritableUtils.writeString(out, userStatusInfo);
+    WritableUtils.writeEnum(out, phase);
+
+    counters.write(out);
+    
+    out.writeLong(localOutputSize);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    out.writeLong(sortFinishTime);
+    out.writeLong(mapFinishTime);
+    out.writeLong(shuffleFinishTime);
+
+    out.writeInt(failedTaskDependencies.size());
+    for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
+      taskAttemptId.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = TezTaskAttemptID.read(in);
+    state = WritableUtils.readEnum(in, State.class);
+    progress = in.readFloat();
+    diagnostics = WritableUtils.readString(in);
+    userStatusInfo = WritableUtils.readString(in);
+    phase = WritableUtils.readEnum(in, Phase.class);
+    counters = new TezCounters();
+    
+    counters.readFields(in);
+    
+    localOutputSize = in.readLong();
+    startTime = in.readLong();
+    finishTime = in.readLong();
+    sortFinishTime = in.readLong();
+    mapFinishTime = in.readLong();
+    shuffleFinishTime = in.readLong();
+    
+    int numFailedDependencies = in.readInt();
+    for (int i = 0 ; i < numFailedDependencies ; i++) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.read(in);
+      failedTaskDependencies.add(taskAttemptId);
+    }
+    
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum MRTaskType {
+
+  MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP;
+
+  public String toString() {
+    switch (this) {
+      case MAP:
+        return "m";
+      case REDUCE:
+        return "r";
+      default:
+        return this.name();
+    }
+  }
+
+  public static MRTaskType fromString(String taskTypeString) {
+    if (taskTypeString.equals("m") || taskTypeString.equals(MRTaskType.MAP.toString())) {
+      return MRTaskType.MAP;
+    } else if (taskTypeString.equals("r") || taskTypeString.equals(MRTaskType.REDUCE.toString())) {
+      return MRTaskType.REDUCE;
+    } else {
+      return MRTaskType.valueOf(taskTypeString);
+    }
+  }
+  
+  public String toSerializedString() {
+    return this.name();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+
+import com.google.common.base.Preconditions;
+
+public class MultiStageMRConfigUtil {
+
+  // TODO MRR FIXME based on conf format.
+  // Returns a complete conf object including non-intermediate stage conf.
+  public static Configuration getIntermediateStageConf(Configuration baseConf,
+      int i) {
+    String base = getPropertyNameForStage(i, "");
+    Configuration conf = new Configuration(false);
+    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+    while (confEntries.hasNext()) {
+      Entry<String, String> entry = confEntries.next();
+      String key = entry.getKey();
+      if (key.startsWith(base)) {
+        conf.set(key.replace(base, ""), entry.getValue());
+      } else {
+        conf.set(key, entry.getValue());
+      }
+    }
+    return conf;
+  }
+
+  // TODO MRR FIXME based on conf format.
+  // Returns config settings specific to stage-i only.
+  public static Configuration getBasicIntermediateStageConf(
+      Configuration baseConf, int i) {
+    String base = getPropertyNameForStage(i, "");
+    Configuration conf = new Configuration(false);
+    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+    while (confEntries.hasNext()) {
+      Entry<String, String> entry = confEntries.next();
+      String key = entry.getKey();
+      if (key.startsWith(base)) {
+        conf.set(key.replace(base, ""), entry.getValue());
+      }
+    }
+    return conf;
+  }
+
+  // TODO MRR FIXME based on conf format.
+  public static int getNumIntermediateStages(Configuration conf) {
+    return conf.getInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
+  }
+
+  // TODO MRR FIXME based on conf format.
+  public static String getPropertyNameForStage(int intermediateStage,
+      String originalPropertyName) {
+    return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
+        + originalPropertyName;
+  }
+
+  public static void main(String[] args) {
+    Configuration baseConf = new Configuration();
+    baseConf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
+    baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+        "mapreduce.job.combine.class"), IntSumReducer.class, Reducer.class);
+    baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+        "mapreduce.job.reduce.class"), IntSumReducer.class, Reducer.class);
+
+    Configuration conf = getBasicIntermediateStageConf(baseConf, 1);
+    printConf(conf);
+  }
+  
+  private static String IREDUCE_PREFIX = "ireduce";
+  
+  public static String getIntermediateReduceVertexName(int i) {
+    return "ireduce" + i;
+  }
+
+  public static boolean isIntermediateReduceStage(String vertexName) {
+    return vertexName.startsWith(IREDUCE_PREFIX);
+  }
+  
+  public static int getIntermediateReduceStageNum(String vertexName) {
+    Preconditions.checkArgument(vertexName.startsWith(IREDUCE_PREFIX),
+        "IntermediateReduce vertex name must start with prefix: "
+            + IREDUCE_PREFIX);
+    String stageNumString = vertexName.substring(IREDUCE_PREFIX.length());
+    return Integer.valueOf(stageNumString);
+  }
+
+  // TODO Get rid of this. Temporary for testing.
+  public static void printConf(Configuration conf) {
+    Iterator<Entry<String, String>> confEntries = conf.iterator();
+    while (confEntries.hasNext()) {
+      Entry<String, String> entry = confEntries.next();
+      String key = entry.getKey();
+      String value = entry.getValue();
+      System.err.println("Key: " + key + ", Value: " + value);
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+/** Protocol that task child process uses to contact its parent process.  The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process.  All communication between child
+ * and parent is via this protocol. */ 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface TezTaskUmbilicalProtocol extends Master {
+
+  public static final long versionID = 19L;
+
+  ContainerTask getTask(ContainerContext containerContext) throws IOException;
+  
+  boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus) 
+  throws IOException, InterruptedException;
+  
+  void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
+  
+  boolean ping(TezTaskAttemptID taskid) throws IOException;
+
+  void done(TezTaskAttemptID taskid) throws IOException;
+  
+  void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus) 
+  throws IOException, InterruptedException;  
+
+  boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+  void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
+  
+  void fsError(TezTaskAttemptID taskId, String message) throws IOException;
+
+  void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
+  
+  // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
+  // call.
+  // IAC outputReady followed by commit is a little confusing - since the output
+  // isn't really in place till a commit is called. Maybe rename to
+  // processingComplete or some such.
+  
+  // TODO EVENTUALLY This is not the most useful API. Once there's some kind of
+  // support for the Task handing output over to the Container, this won't rally
+  // be required. i.e. InMemShuffle running as a service in the Container, or
+  // the second task in getTask(). ContainerUmbilical would include getTask and
+  // getServices...
+  
+  void outputReady(TezTaskAttemptID taskAttemptId, OutputContext outputContext)
+      throws IOException;
+  
+  ProceedToCompletionResponse
+      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TezTypeConverters {
+
+  // Tez objects will be imported. Others will use the fully qualified name when
+  // required.
+  // All public methods named toYarn / toTez / toMapReduce
+
+  public static org.apache.hadoop.mapreduce.JobID toJobID(TezDAGID id) {
+    return new JobID(
+        String.valueOf(id.getApplicationId().getClusterTimestamp()), id.getId());
+
+  }
+  
+  public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
+      Phase phase) {
+    return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
+        .name());
+  }
+
+  public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
+    TaskAttemptID mrTaskAttemptId = IDConverter
+        .toMRTaskAttemptId(taskAttemptId);
+    TaskAttemptId mrv2TaskAttemptId = TypeConverter.toYarn(mrTaskAttemptId);
+    return mrv2TaskAttemptId;
+  }
+
+  public static TezTaskAttemptID toTez(TaskAttemptId taskAttemptId) {
+    TaskAttemptID mrTaskAttemptId = TypeConverter.fromYarn(taskAttemptId);
+    TezTaskAttemptID tezTaskAttemptId = IDConverter
+        .fromMRTaskAttemptId(mrTaskAttemptId);
+    return tezTaskAttemptId;
+  }
+
+  public static TezDependentTaskCompletionEvent.Status toTez(
+      TaskAttemptCompletionEventStatus status) {
+    return TezDependentTaskCompletionEvent.Status.valueOf(status.toString());
+  }
+
+  
+  
+  public static Counters fromTez(TezCounters tezCounters) {
+    if (tezCounters == null) {
+      return null;
+    }
+    Counters counters = new Counters();
+    for (CounterGroup xGrp : tezCounters) {
+      counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
+      for (TezCounter xCounter : xGrp) {
+        Counter counter =
+            counters.findCounter(xGrp.getName(), xCounter.getName());
+        counter.setValue(xCounter.getValue());
+
+      }
+    }
+    return counters;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,65 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl 
+    extends org.apache.hadoop.mapreduce.task.JobContextImpl 
+    implements JobContext {
+  private JobConf job;
+  private Progressable progress;
+
+  public JobContextImpl(JobConf conf, TezDAGID dagId,
+                 Progressable progress) {
+    super(conf, IDConverter.toMRJobId(dagId));
+    this.job = conf;
+    this.progress = progress;
+  }
+
+  public JobContextImpl(JobConf conf, TezDAGID dagId) {
+    this(conf, dagId, Reporter.NULL);
+  }
+  
+  /**
+   * Get the job Configuration
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return job;
+  }
+  
+  /**
+   * Get the progress mechanism for reporting progress.
+   * 
+   * @return progress mechanism 
+   */
+  public Progressable getProgressible() {
+    return progress;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,312 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+
+public class MRCounters extends org.apache.hadoop.mapred.Counters {
+  private final org.apache.tez.common.counters.TezCounters raw;
+  
+  public MRCounters(org.apache.tez.common.counters.TezCounters raw) {
+    this.raw = raw;
+  }
+
+  @Override
+  public synchronized org.apache.hadoop.mapred.Counters.Group getGroup(String groupName) {
+    return new MRCounterGroup(raw.getGroup(groupName));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized Collection<String> getGroupNames() {
+    return IteratorUtils.toList(raw.getGroupNames().iterator());  }
+
+  @Override
+  public synchronized String makeCompactString() {
+    StringBuilder builder = new StringBuilder();
+    boolean first = true;
+    for(Group group: this){
+      for(Counter counter: group) {
+        if (first) {
+          first = false;
+        } else {
+          builder.append(',');
+        }
+        builder.append(group.getDisplayName());
+        builder.append('.');
+        builder.append(counter.getDisplayName());
+        builder.append(':');
+        builder.append(counter.getCounter());
+      }
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public synchronized Counter findCounter(String group, String name) {
+    return new MRCounter(raw.findCounter(group, name));
+  }
+
+  @Override
+  public Counter findCounter(String group, int id, String name) {
+    return new MRCounter(raw.findCounter(group, name));
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    raw.findCounter(key).increment(amount);
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    raw.findCounter(group, counter).increment(amount);
+  }
+
+  @Override
+  public synchronized long getCounter(Enum<?> key) {
+    return raw.findCounter(key).getValue();
+  }
+
+  @Override
+  public synchronized void incrAllCounters(
+      org.apache.hadoop.mapred.Counters other) {
+    for (Group otherGroup: other) {
+      Group group = getGroup(otherGroup.getName());
+      group.setDisplayName(otherGroup.getDisplayName());
+      for (Counter otherCounter : otherGroup) {
+        Counter counter = group.getCounterForName(otherCounter.getName());
+        counter.setDisplayName(otherCounter.getDisplayName());
+        counter.increment(otherCounter.getValue());
+      }
+    }
+  }
+  
+  @Override
+  public int size() {
+    return countCounters();
+  }
+
+  @Override
+  public void log(Log log) {
+    log.info("Counters: " + size());
+    for(Group group: this) {
+      log.info("  " + group.getDisplayName());
+      for (Counter counter: group) {
+        log.info("    " + counter.getDisplayName() + "=" +
+                 counter.getCounter());
+      }
+    }
+  }
+
+  @Override
+  public String makeEscapedCompactString() {
+    return toEscapedCompactString(this);
+  }
+  
+  public static class MRCounterGroup extends org.apache.hadoop.mapred.Counters.Group {
+    private final org.apache.tez.common.counters.CounterGroup group;
+    public MRCounterGroup(org.apache.tez.common.counters.CounterGroup group) {
+      this.group = group;
+    }
+    @Override
+    public String getName() {
+      return group.getName();
+    }
+    @Override
+    public String getDisplayName() {
+      return group.getDisplayName();
+    }
+    @Override
+    public void setDisplayName(String displayName) {
+      group.setDisplayName(displayName);
+    }
+    @Override
+    public void addCounter(org.apache.hadoop.mapred.Counters.Counter counter) {
+      group.addCounter(convert(counter));
+    }
+    @Override
+    public org.apache.hadoop.mapred.Counters.Counter addCounter(String name,
+        String displayName, long value) {
+      return new MRCounter(group.addCounter(name, displayName, value));
+    }
+    @Override
+    public org.apache.hadoop.mapred.Counters.Counter findCounter(
+        String counterName, String displayName) {
+      return new MRCounter(group.findCounter(counterName, displayName));
+    }
+    @Override
+    public int size() {
+      return group.size();
+    }
+    @Override
+    public void incrAllCounters(
+        org.apache.hadoop.mapreduce.counters.CounterGroupBase rightGroup) {
+      new MRCounterGroup(group).incrAllCounters(rightGroup);
+    }
+    @Override
+    public org.apache.hadoop.mapreduce.counters.CounterGroupBase 
+    getUnderlyingGroup() {
+      return new MRCounterGroup(group).getUnderlyingGroup();
+    }
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+    }
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+    }
+    @Override
+    public Iterator iterator() {
+      // FIXME?
+      return group.iterator();
+    }
+  }
+  
+  public static class MRCounter extends Counter {
+    private final org.apache.tez.common.counters.TezCounter raw;
+    
+    public MRCounter(org.apache.tez.common.counters.TezCounter raw) {
+      this.raw = raw;
+    }
+
+    @Override
+    public void setDisplayName(String displayName) {
+      // TODO Auto-generated method stub
+      raw.setDisplayName(displayName);
+    }
+
+    @Override
+    public String getName() {
+      return raw.getName();
+    }
+
+    @Override
+    public String getDisplayName() {
+      return raw.getDisplayName();
+    }
+
+    @Override
+    public long getValue() {
+      return raw.getValue();
+    }
+
+    @Override
+    public void setValue(long value) {
+      raw.setValue(value);
+    }
+
+    @Override
+    public void increment(long incr) {
+      raw.increment(incr);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      raw.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      raw.readFields(in);
+    }
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(new MRCounter(raw));
+    }
+
+    @Deprecated
+    public boolean contentEquals(Counter counter) {
+      MRCounter c = new MRCounter(raw);
+      return c.equals(counter.getUnderlyingCounter());
+    }
+
+
+    @Override
+    public long getCounter() {
+      return raw.getValue();
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
+      return new MRCounter(raw).getUnderlyingCounter();
+    }
+
+    @Override
+    public synchronized boolean equals(Object genericRight) {
+      return raw.equals(genericRight);
+    }
+
+    @Override
+    public int hashCode() {
+      // TODO Auto-generated method stub
+      return raw.hashCode();
+    }
+  }
+  
+  static org.apache.tez.common.counters.TezCounter convert(
+      org.apache.hadoop.mapred.Counters.Counter counter) {
+    org.apache.hadoop.mapreduce.Counter underlyingCounter =
+        counter.getUnderlyingCounter();
+    if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter) {
+      org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter 
+      real = 
+      (org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter)underlyingCounter;
+      return new org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter(
+          real.getKey(), real.getGroupName());
+    } else if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter) {
+      org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter real = 
+          (org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter)underlyingCounter;
+      return new org.apache.tez.common.counters.FileSystemCounterGroup.FSCounter(
+          real.getScheme(), convert(real.getFileSystemCounter()));
+    } else {
+      return new org.apache.tez.common.counters.GenericCounter(
+          underlyingCounter.getName(), 
+          underlyingCounter.getDisplayName(), 
+          underlyingCounter.getValue());
+    }
+  }
+  
+  static org.apache.tez.common.counters.FileSystemCounter convert(
+      org.apache.hadoop.mapreduce.FileSystemCounter c) {
+    switch (c) {
+      case BYTES_READ:
+        return org.apache.tez.common.counters.FileSystemCounter.BYTES_READ;
+      case BYTES_WRITTEN:
+        return org.apache.tez.common.counters.FileSystemCounter.BYTES_WRITTEN;
+      case READ_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.READ_OPS;
+      case LARGE_READ_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.LARGE_READ_OPS;
+      case WRITE_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.WRITE_OPS;
+      default:
+        throw new IllegalArgumentException("Unknow FileSystemCounter: " + c);
+    }
+    
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
+       implements TaskAttemptContext {
+  private MRTaskReporter reporter;
+
+  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskid) {
+    this(conf, taskid, null);
+  }
+  
+  // FIXME we need to use DAG Id but we are using App Id
+  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskAttemptId,
+                         MRTaskReporter reporter) {
+    super(conf, 
+        new TaskAttemptID(
+            new TaskID(
+                Long.toString(taskAttemptId.getTaskID().getVertexID().
+                    getDAGId().getApplicationId().getClusterTimestamp()),
+                taskAttemptId.getTaskID().getVertexID().getDAGId().
+                    getApplicationId().getId(),
+                (taskAttemptId.getTaskID().getVertexID().getId() == 0 ?
+                    TaskType.MAP : TaskType.REDUCE),
+                taskAttemptId.getTaskID().getId()),
+              taskAttemptId.getId()));
+    this.reporter = reporter;
+  }
+  
+  /**
+   * Get the taskAttemptID.
+   *  
+   * @return TaskAttemptID
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return (TaskAttemptID) super.getTaskAttemptID();
+  }
+  
+  public Progressable getProgressible() {
+    return reporter;
+  }
+  
+  public JobConf getJobConf() {
+    return (JobConf) getConfiguration();
+  }
+  
+  @Override
+  public float getProgress() {
+    return reporter.getProgress();
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return (Counter) reporter.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return (Counter) reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  @Override
+  public void progress() {
+    reporter.progress();
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  @Override
+  public void setStatus(String status) {
+    setStatusString(status);
+    reporter.setStatus(status);
+  }
+
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl implements JobContext {
+
+  protected final org.apache.hadoop.mapred.JobConf conf;
+  private TezDAGID dagId;
+  /**
+   * The UserGroupInformation object that has a reference to the current user
+   */
+  protected UserGroupInformation ugi;
+  protected final Credentials credentials;
+  private Progressable progress;
+
+  public JobContextImpl(Configuration conf, TezDAGID dagId) {
+    this(conf, dagId, MRTaskReporter.NULL);
+  }
+  
+  public JobContextImpl(Configuration conf, TezDAGID dagId, Progressable progress) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf)conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.dagId = dagId;
+    this.credentials = this.conf.getCredentials();
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.progress = progress;
+  }
+
+  /**
+   * Return the configuration for the job.
+   * @return the shared configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get the unique ID for the job.
+   * @return the object with the job id
+   */
+  public JobID getJobID() {
+    return IDConverter.toMRJobId(dagId);
+  }
+  
+  /**
+   * Set the JobID.
+   */
+  public void setJobID(JobID jobId) {
+    this.dagId = IDConverter.fromMRJobId(jobId);
+  }
+  
+  /**
+   * Get configured the number of reduce tasks for this job. Defaults to 
+   * <code>1</code>.
+   * @return the number of reduce tasks for this job.
+   */
+  public int getNumReduceTasks() {
+    return conf.getNumReduceTasks();
+  }
+  
+  /**
+   * Get the current working directory for the default file system.
+   * 
+   * @return the directory name.
+   */
+  public Path getWorkingDirectory() throws IOException {
+    return conf.getWorkingDirectory();
+  }
+
+  /**
+   * Get the key class for the job output data.
+   * @return the key class for the job output data.
+   */
+  public Class<?> getOutputKeyClass() {
+    return conf.getOutputKeyClass();
+  }
+  
+  /**
+   * Get the value class for job outputs.
+   * @return the value class for job outputs.
+   */
+  public Class<?> getOutputValueClass() {
+    return conf.getOutputValueClass();
+  }
+
+  /**
+   * Get the key class for the map output data. If it is not set, use the
+   * (final) output key class. This allows the map output key class to be
+   * different than the final output key class.
+   * @return the map output key class.
+   */
+  public Class<?> getMapOutputKeyClass() {
+    return conf.getMapOutputKeyClass();
+  }
+
+  /**
+   * Get the value class for the map output data. If it is not set, use the
+   * (final) output value class This allows the map output value class to be
+   * different than the final output value class.
+   *  
+   * @return the map output value class.
+   */
+  public Class<?> getMapOutputValueClass() {
+    return conf.getMapOutputValueClass();
+  }
+
+  /**
+   * Get the user-specified job name. This is only used to identify the 
+   * job to the user.
+   * 
+   * @return the job's name, defaulting to "".
+   */
+  public String getJobName() {
+    return conf.getJobName();
+  }
+
+  /**
+   * Get the {@link InputFormat} class for the job.
+   * 
+   * @return the {@link InputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends InputFormat<?,?>>) 
+      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
+  }
+
+  /**
+   * Get the {@link Mapper} class for the job.
+   * 
+   * @return the {@link Mapper} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Mapper<?,?,?,?>>) 
+      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+  }
+
+  /**
+   * Get the combiner class for the job.
+   * 
+   * @return the combiner class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(COMBINE_CLASS_ATTR, null);
+  }
+
+  /**
+   * Get the {@link Reducer} class for the job.
+   * 
+   * @return the {@link Reducer} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+  }
+
+  /**
+   * Get the {@link OutputFormat} class for the job.
+   * 
+   * @return the {@link OutputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends OutputFormat<?,?>>) 
+      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
+  }
+
+  /**
+   * Get the {@link Partitioner} class for the job.
+   * 
+   * @return the {@link Partitioner} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Partitioner<?,?>>) 
+      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
+  }
+
+  /**
+   * Get the {@link RawComparator} comparator used to compare keys.
+   * 
+   * @return the {@link RawComparator} comparator used to compare keys.
+   */
+  public RawComparator<?> getSortComparator() {
+    return conf.getOutputKeyComparator();
+  }
+
+  /**
+   * Get the pathname of the job's jar.
+   * @return the pathname
+   */
+  public String getJar() {
+    return conf.getJar();
+  }
+
+  /** 
+   * Get the user defined {@link RawComparator} comparator for 
+   * grouping keys of inputs to the reduce.
+   * 
+   * @return comparator set by the user for grouping values.
+   * @see Job#setGroupingComparatorClass(Class) for details.  
+   */
+  public RawComparator<?> getGroupingComparator() {
+    return conf.getOutputValueGroupingComparator();
+  }
+  
+  /**
+   * Get whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getJobSetupCleanupNeeded() {
+    return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+  }
+  
+  /**
+   * Get whether task-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getTaskCleanupNeeded() {
+    return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
+  }
+
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @return true if symlinks are to be created- else return false
+   */
+  public boolean getSymlink() {
+    return DistributedCache.getSymlink(conf);
+  }
+  
+  /**
+   * Get the archive entries in classpath as an array of Path
+   */
+  public Path[] getArchiveClassPaths() {
+    return DistributedCache.getArchiveClassPaths(conf);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public URI[] getCacheArchives() throws IOException {
+    return DistributedCache.getCacheArchives(conf);
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public URI[] getCacheFiles() throws IOException {
+    return DistributedCache.getCacheFiles(conf);
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public Path[] getLocalCacheArchives()
+    throws IOException {
+    return DistributedCache.getLocalCacheArchives(conf);
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public Path[] getLocalCacheFiles()
+    throws IOException {
+    return DistributedCache.getLocalCacheFiles(conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   */
+  public Path[] getFileClassPaths() {
+    return DistributedCache.getFileClassPaths(conf);
+  }
+  
+  /**
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getArchiveTimestamps() {
+    return DistributedCache.getArchiveTimestamps(conf);
+  }
+
+  /**
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getFileTimestamps() {
+    return DistributedCache.getFileTimestamps(conf);
+  }
+
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
+   */
+  public int getMaxMapAttempts() {
+    return conf.getMaxMapAttempts();
+  }
+
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
+   */
+  public int getMaxReduceAttempts() {
+    return conf.getMaxReduceAttempts();
+  }
+
+  /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled() {
+    return conf.getProfileEnabled();
+  }
+
+  /**
+   * Get the profiler configuration arguments.
+   *
+   * The default value for this property is
+   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+   * 
+   * @return the parameters to pass to the task child to configure profiling
+   */
+  public String getProfileParams() {
+    return conf.getProfileParams();
+  }
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return conf.getProfileTaskRange(isMap);
+  }
+
+  /**
+   * Get the reported username for this job.
+   * 
+   * @return the username
+   */
+  public String getUser() {
+    return conf.getUser();
+  }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+  @Override
+  public JobConf getJobConf() {
+    return conf;
+  }
+
+  @Override
+  public Progressable getProgressible() {
+    return progress;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native