You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [32/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,676 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MRJobConfig {
+
+ // Put all of the attribute names in here so that Job and JobContext are
+ // consistent.
+ public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";
+
+ public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+
+ public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
+
+ public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
+
+ public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class";
+
+ public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class";
+
+ public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";
+
+ public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+
+ public static final String JAR = "mapreduce.job.jar";
+
+ public static final String ID = "mapreduce.job.id";
+
+ public static final String JOB_NAME = "mapreduce.job.name";
+
+ public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
+
+ public static final String USER_NAME = "mapreduce.job.user.name";
+
+ public static final String PRIORITY = "mapreduce.job.priority";
+
+ public static final String QUEUE_NAME = "mapreduce.job.queuename";
+
+ public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
+
+ public static final String SPLIT_FILE = "mapreduce.job.splitfile";
+
+ public static final String SPLIT_METAINFO_MAXSIZE = "mapreduce.job.split.metainfo.maxsize";
+ public static final long DEFAULT_SPLIT_METAINFO_MAXSIZE = 10000000L;
+
+ public static final String NUM_MAPS = "mapreduce.job.maps";
+
+ public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";
+
+ public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART = "mapreduce.job.reduce.slowstart.completedmaps";
+
+ public static final String NUM_REDUCES = "mapreduce.job.reduces";
+
+ public static final String SKIP_RECORDS = "mapreduce.job.skiprecords";
+
+ public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+
+ public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
+
+ public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
+
+ public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
+
+ public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
+
+ public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
+
+ public static final String OUTPUT_VALUE_CLASS = "mapreduce.job.output.value.class";
+
+ public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+
+ public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
+
+ public static final String WORKING_DIR = "mapreduce.job.working.dir";
+
+ public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives";
+
+ public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
+
+ public static final String CACHE_FILES = "mapreduce.job.cache.files";
+
+ public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+
+ public static final String CACHE_FILES_SIZES = "mapreduce.job.cache.files.filesizes"; // internal use only
+
+ public static final String CACHE_ARCHIVES_SIZES = "mapreduce.job.cache.archives.filesizes"; // ditto
+
+ public static final String CACHE_LOCALFILES = "mapreduce.job.cache.local.files";
+
+ public static final String CACHE_LOCALARCHIVES = "mapreduce.job.cache.local.archives";
+
+ public static final String CACHE_FILE_TIMESTAMPS = "mapreduce.job.cache.files.timestamps";
+
+ public static final String CACHE_ARCHIVES_TIMESTAMPS = "mapreduce.job.cache.archives.timestamps";
+
+ public static final String CACHE_FILE_VISIBILITIES = "mapreduce.job.cache.files.visibilities";
+
+ public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
+
+ /**
+ * @deprecated Symlinks are always on and cannot be disabled.
+ */
+ @Deprecated
+ public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
+
+ public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
+
+ public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
+
+ public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
+
+ public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+
+ public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
+ public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
+
+ public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";
+
+ public static final String TASK_TEMP_DIR = "mapreduce.task.tmp.dir";
+
+ public static final String TASK_DEBUGOUT_LINES = "mapreduce.task.debugout.lines";
+
+ public static final String RECORDS_BEFORE_PROGRESS = "mapreduce.task.merge.progress.records";
+
+ public static final String SKIP_START_ATTEMPTS = "mapreduce.task.skip.start.attempts";
+
+ public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+
+ public static final String TASK_ISMAP = "mapreduce.task.ismap";
+
+ public static final String TASK_PARTITION = "mapreduce.task.partition";
+
+ public static final String TASK_PROFILE = "mapreduce.task.profile";
+
+ public static final String TASK_PROFILE_PARAMS = "mapreduce.task.profile.params";
+
+ public static final String NUM_MAP_PROFILES = "mapreduce.task.profile.maps";
+
+ public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
+
+ public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
+
+ public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
+
+ public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+
+ public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
+
+ public static final String TASK_ID = "mapreduce.task.id";
+
+ public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
+
+ public static final String TASK_USERLOG_LIMIT = "mapreduce.task.userlog.limit.kb";
+
+ public static final String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";
+
+ public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";
+
+ public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
+
+ public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+
+ public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+ public static final int DEFAULT_MAP_MEMORY_MB = 1024;
+
+ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
+ public static final int DEFAULT_MAP_CPU_VCORES = 1;
+
+ public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb";
+
+ public static final String MAP_ENV = "mapreduce.map.env";
+
+ public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+
+ public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts";
+
+ public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script";
+
+ public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
+
+ public static final String MAP_FAILURES_MAX_PERCENT = "mapreduce.map.failures.maxpercent";
+
+ public static final String MAP_SKIP_INCR_PROC_COUNT = "mapreduce.map.skip.proc-count.auto-incr";
+
+ public static final String MAP_SKIP_MAX_RECORDS = "mapreduce.map.skip.maxrecords";
+
+ public static final String MAP_COMBINE_MIN_SPILLS = "mapreduce.map.combine.minspills";
+
+ public static final String MAP_OUTPUT_COMPRESS = "mapreduce.map.output.compress";
+
+ public static final String MAP_OUTPUT_COMPRESS_CODEC = "mapreduce.map.output.compress.codec";
+
+ public static final String MAP_OUTPUT_KEY_CLASS = "mapreduce.map.output.key.class";
+
+ public static final String MAP_OUTPUT_VALUE_CLASS = "mapreduce.map.output.value.class";
+
+ public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = "mapreduce.map.output.key.field.separator";
+
+ public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
+
+ public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
+
+ public static final String DEFAULT_LOG_LEVEL = "INFO";
+
+ public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
+
+ public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
+
+ public static final String REDUCE_MARKRESET_BUFFER_PERCENT = "mapreduce.reduce.markreset.buffer.percent";
+
+ public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
+
+ public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb";
+
+ public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+ public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
+
+ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
+ public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
+
+ public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
+
+ public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
+
+ public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
+ = "mapreduce.reduce.shuffle.memory.limit.percent";
+
+ public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
+
+ public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";
+
+ public static final String REDUCE_ENV = "mapreduce.reduce.env";
+
+ public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
+
+ public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
+
+ public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
+
+ public static final String SHUFFLE_PARALLEL_COPIES = "mapreduce.reduce.shuffle.parallelcopies";
+
+ public static final String REDUCE_DEBUG_SCRIPT = "mapreduce.reduce.debug.script";
+
+ public static final String REDUCE_SPECULATIVE = "mapreduce.reduce.speculative";
+
+ public static final String SHUFFLE_CONNECT_TIMEOUT = "mapreduce.reduce.shuffle.connect.timeout";
+
+ public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
+
+ public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+
+ public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+
+ public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
+
+ public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
+
+ public static final String REDUCE_MEMTOMEM_THRESHOLD = "mapreduce.reduce.merge.memtomem.threshold";
+
+ public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
+
+ public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
+
+ public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+
+ public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
+
+ public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens";
+
+ public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
+
+ public static final String DEFAULT_JOB_ACL_VIEW_JOB = " ";
+
+ public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
+
+ public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
+
+ /* config for tracking the local file where all the credentials for the job
+ * credentials.
+ */
+ public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
+ "mapreduce.job.credentials.binary";
+
+ public static final String JOB_SUBMITHOST =
+ "mapreduce.job.submithostname";
+ public static final String JOB_SUBMITHOSTADDR =
+ "mapreduce.job.submithostaddress";
+
+ public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+ public static final int COUNTERS_MAX_DEFAULT = 120;
+
+ public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+ public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+ public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+ public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+ public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+ public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+ public static final String JOB_UBERTASK_ENABLE =
+ "mapreduce.job.ubertask.enable";
+ public static final String JOB_UBERTASK_MAXMAPS =
+ "mapreduce.job.ubertask.maxmaps";
+ public static final String JOB_UBERTASK_MAXREDUCES =
+ "mapreduce.job.ubertask.maxreduces";
+ public static final String JOB_UBERTASK_MAXBYTES =
+ "mapreduce.job.ubertask.maxbytes";
+
+ public static final String MR_PREFIX = "yarn.app.mapreduce.";
+
+ public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
+
+ /** The number of client retires to the AM - before reconnecting to the RM
+ * to fetch Application State.
+ */
+ public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES =
+ MR_PREFIX + "client-am.ipc.max-retries";
+ public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
+
+ /**
+ * The number of client retries to the RM/HS/AM before throwing exception.
+ */
+ public static final String MR_CLIENT_MAX_RETRIES =
+ MR_PREFIX + "client.max-retries";
+ public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+
+ /** The staging directory for map reduce.*/
+ public static final String MR_AM_STAGING_DIR =
+ MR_AM_PREFIX+"staging-dir";
+ public static final String DEFAULT_MR_AM_STAGING_DIR =
+ "/tmp/hadoop-yarn/staging";
+
+ /** The amount of memory the MR app master needs.*/
+ public static final String MR_AM_VMEM_MB =
+ MR_AM_PREFIX+"resource.mb";
+ public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
+
+ /** The number of virtual cores the MR app master needs.*/
+ public static final String MR_AM_CPU_VCORES =
+ MR_AM_PREFIX+"resource.cpu-vcores";
+ public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
+
+ /** Command line arguments passed to the MR app master.*/
+ public static final String MR_AM_COMMAND_OPTS =
+ MR_AM_PREFIX+"command-opts";
+ public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1024m";
+
+ /** Admin command opts passed to the MR app master.*/
+ public static final String MR_AM_ADMIN_COMMAND_OPTS =
+ MR_AM_PREFIX+"admin-command-opts";
+ public static final String DEFAULT_MR_AM_ADMIN_COMMAND_OPTS = "";
+
+ /** Root Logging level passed to the MR app master.*/
+ public static final String MR_AM_LOG_LEVEL =
+ MR_AM_PREFIX+"log.level";
+ public static final String DEFAULT_MR_AM_LOG_LEVEL = "INFO";
+
+ /**The number of splits when reporting progress in MR*/
+ public static final String MR_AM_NUM_PROGRESS_SPLITS =
+ MR_AM_PREFIX+"num-progress-splits";
+ public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
+
+ /**
+ * Upper limit on the number of threads user to launch containers in the app
+ * master. Expect level config, you shouldn't be needing it in most cases.
+ */
+ public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+ MR_AM_PREFIX+"containerlauncher.thread-count-limit";
+
+ public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+ 500;
+
+ /** Number of threads to handle job client RPC requests.*/
+ public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
+ MR_AM_PREFIX + "job.client.thread-count";
+ public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
+
+ /**
+ * Range of ports that the MapReduce AM can use when binding. Leave blank
+ * if you want all possible ports.
+ */
+ public static final String MR_AM_JOB_CLIENT_PORT_RANGE =
+ MR_AM_PREFIX + "job.client.port-range";
+
+ /** Enable blacklisting of nodes in the job.*/
+ public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
+ MR_AM_PREFIX + "job.node-blacklisting.enable";
+
+ /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
+ public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
+ MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
+ public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
+ 33;
+
+ /** Enable job recovery.*/
+ public static final String MR_AM_JOB_RECOVERY_ENABLE =
+ MR_AM_PREFIX + "job.recovery.enable";
+
+ /**
+ * Limit on the number of reducers that can be preempted to ensure that at
+ * least one map task can run if it needs to. Percentage between 0.0 and 1.0
+ */
+ public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT =
+ MR_AM_PREFIX + "job.reduce.preemption.limit";
+ public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
+
+ /** AM ACL disabled. **/
+ public static final String JOB_AM_ACCESS_DISABLED =
+ "mapreduce.job.am-access-disabled";
+ public static final boolean DEFAULT_JOB_AM_ACCESS_DISABLED = false;
+
+ /**
+ * Limit reduces starting until a certain percentage of maps have finished.
+ * Percentage between 0.0 and 1.0
+ */
+ public static final String MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT =
+ MR_AM_PREFIX + "job.reduce.rampup.limit";
+ public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
+
+ /** The class that should be used for speculative execution calculations.*/
+ public static final String MR_AM_JOB_SPECULATOR =
+ MR_AM_PREFIX + "job.speculator.class";
+
+ /** Class used to estimate task resource needs.*/
+ public static final String MR_AM_TASK_ESTIMATOR =
+ MR_AM_PREFIX + "job.task.estimator.class";
+
+ /** The lambda value in the smoothing function of the task estimator.*/
+ public static final String MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
+ MR_AM_PREFIX
+ + "job.task.estimator.exponential.smooth.lambda-ms";
+
+ public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
+ 1000L * 60;
+
+ /** true if the smoothing rate should be exponential.*/
+ public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
+ MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
+
+ /** The number of threads used to handle task RPC calls.*/
+ public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
+ MR_AM_PREFIX + "job.task.listener.thread-count";
+ public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30;
+
+ /** How often the AM should schedule assigning tasks with allocated
+ * containers.*/
+ public static final String MR_AM_SCHEDULER_INTERVAL =
+ MR_AM_PREFIX + "scheduler.interval-ms";
+ public static final long DEFAULT_MR_AM_SCHEDULER_INTERVAL = 1000l;
+
+ /** How often the AM should send heartbeats to the RM.*/
+ public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS =
+ MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
+ public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
+
+ /**
+ * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+ * milliseconds before aborting. During this interval, AM will still try
+ * to contact the RM.
+ */
+ public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+ MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+ public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+ /**
+ * Boolean. Create the base dirs in the JobHistoryEventHandler
+ * Set to false for multi-user clusters. This is an internal config that
+ * is set by the MR framework and read by it too.
+ */
+ public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
+ MR_AM_PREFIX + "create-intermediate-jh-base-dir";
+
+ public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+ MR_AM_PREFIX + "history.max-unflushed-events";
+ public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+ 200;
+
+ public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+ MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
+ public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+ 30;
+
+ public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+ MR_AM_PREFIX + "history.complete-event-flush-timeout";
+ public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+ 30 * 1000l;
+
+ public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+ MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
+ public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+ 50;
+
+ public static final String MR_AM_ENV =
+ MR_AM_PREFIX + "env";
+
+ public static final String MR_AM_ADMIN_USER_ENV =
+ MR_AM_PREFIX + "admin.user.env";
+
+ public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
+ "mapreduce.admin.map.child.java.opts";
+
+ public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
+ "mapreduce.admin.reduce.child.java.opts";
+
+ public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
+ "-Djava.net.preferIPv4Stack=true " +
+ "-Dhadoop.metrics.log.level=WARN ";
+
+ public static final String MAPRED_ADMIN_USER_SHELL =
+ "mapreduce.admin.user.shell";
+
+ public static final String DEFAULT_SHELL = "/bin/bash";
+
+ public static final String MAPRED_ADMIN_USER_ENV =
+ "mapreduce.admin.user.env";
+
+ public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
+ "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native";
+
+ public static final String WORKDIR = "work";
+
+ public static final String OUTPUT = "output";
+
+ public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+
+ // Environment variables used by Pipes. (TODO: these
+ // do not appear to be used by current pipes source code!)
+ public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
+ public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
+
+ public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
+ // This should be the directory where splits file gets localized on the node
+ // running ApplicationMaster.
+ public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
+
+ // This should be the name of the localized job-configuration file on the node
+ // running ApplicationMaster and Task
+ public static final String JOB_CONF_FILE = "job.xml";
+
+ // This should be the name of the localized job-jar file on the node running
+ // individual containers/tasks.
+ public static final String JOB_JAR = "job.jar";
+
+ public static final String JOB_SPLIT = "job.split";
+
+ public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
+
+ public static final String APPLICATION_MASTER_CLASS =
+ "org.apache.tez.dag.app.DAGAppMaster";
+
+ // The token file for the application. Should contain tokens for access to
+ // remote file system and may optionally contain application specific tokens.
+ // For now, generated by the AppManagers and used by NodeManagers and the
+ // Containers.
+ public static final String APPLICATION_TOKENS_FILE = "appTokens";
+
+ /** The log directory for the containers */
+ public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
+
+ public static final String TASK_LOG_SIZE = MR_PREFIX + "container.log.filesize";
+
+ public static final String MAPREDUCE_V2_CHILD_CLASS =
+ "org.apache.hadoop.mapred.YarnChild";
+
+ public static final String APPLICATION_ATTEMPT_ID =
+ "mapreduce.job.application.attempt.id";
+
+ /**
+ * Job end notification.
+ */
+ public static final String MR_JOB_END_NOTIFICATION_URL =
+ "mapreduce.job.end-notification.url";
+
+ public static final String MR_JOB_END_NOTIFICATION_PROXY =
+ "mapreduce.job.end-notification.proxy";
+
+ public static final String MR_JOB_END_RETRY_ATTEMPTS =
+ "mapreduce.job.end-notification.retry.attempts";
+
+ public static final String MR_JOB_END_RETRY_INTERVAL =
+ "mapreduce.job.end-notification.retry.interval";
+
+ public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
+ "mapreduce.job.end-notification.max.attempts";
+
+ public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
+ "mapreduce.job.end-notification.max.retry.interval";
+
+ /*
+ * MR AM Service Authorization
+ */
+ public static final String
+ MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL =
+ "security.job.task.protocol.acl";
+ public static final String
+ MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
+ "security.job.client.protocol.acl";
+
+ /**
+ * CLASSPATH for all YARN MapReduce applications.
+ */
+ public static final String MAPREDUCE_APPLICATION_CLASSPATH =
+ "mapreduce.application.classpath";
+
+ /**
+ * Default CLASSPATH for all YARN MapReduce applications.
+ */
+ public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = {
+ "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
+ "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
+ };
+
+
+ // TODO Fix this. Not accessible in JobClient
+ /* do we need a HS delegation token for this client */
+ @InterfaceAudience.Private
+ static final String HS_DELEGATION_TOKEN_REQUIRED
+ = "mapreduce.history.server.delegationtoken.required";
+
+ // MRR related config properties
+
+ public static final String MRR_INTERMEDIATE_STAGES =
+ "mrr.intermediate.num-stages";
+
+ public static final String MRR_INTERMEDIATE_STAGE_PREFIX =
+ "mrr.intermediate.stage.";
+
+ // Stage specific properties
+ // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
+ // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below.
+// public static final String MRR_INTERMEDIATE_STAGE_TASKS = "tasks";
+// public static final String MRR_INTERMEDIATE_STAGE_CLASS = "class";
+// public static final String
+// MRR_INTERMEDIATE_STAGE_PARTITIONER_CLASS = "partitioner.class";
+// public static final String
+// MRR_INTERMEDIATE_STAGE_COMBINER_CLASS = "combiner.class";
+// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESS =
+// "output.compress";
+// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESSION_CODEC =
+// "output.compression.codec";
+// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_CLASS =
+// "key.class";
+// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_COMPARATOR_CLASS
+// = "key.comparator.class";
+// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_VALUE_CLASS =
+// "value.class";
+// public static final String MRR_INTERMEDIATE_STAGE_SPECULATE =
+// "speculate";
+// public static final String MRR_INTERMEDIATE_STAGE_MEMORY_MB =
+// "memory.mb";
+// public static final String MRR_INTERMEDIATE_STAGE_CHILD_JAVA_OPTS =
+// "child.java.opts";
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,300 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class MRTaskStatus implements TezTaskStatus {
+
+ static final Log LOG =
+ LogFactory.getLog(TaskStatus.class.getName());
+ // max task-status string size
+ static final int MAX_STRING_SIZE = 1024;
+
+ private TezTaskAttemptID taskAttemptId;
+ private State state = State.UNASSIGNED;
+ private float progress = 0.0f;
+ private String diagnostics = "";
+ private String userStatusInfo = "";
+ private Phase phase;
+ private TezCounters counters;
+
+ private long localOutputSize;
+ List<TezTaskAttemptID> failedTaskDependencies =
+ new ArrayList<TezTaskAttemptID>();
+
+ private long startTime;
+ private long finishTime;
+ private long sortFinishTime;
+ private long mapFinishTime;
+ private long shuffleFinishTime;
+
+ // For serialization.
+ public MRTaskStatus() {
+ }
+
+ public MRTaskStatus(
+ TezTaskAttemptID taskAttemptId,
+ TezCounters counters, Phase phase) {
+ this.taskAttemptId = taskAttemptId;
+ this.counters = counters;
+ this.phase = phase;
+ }
+
+ @Override
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public State getRunState() {
+ return state;
+ }
+
+ @Override
+ public void setRunState(State state) {
+ this.state = state;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+
+ @Override
+ public void setDiagnosticInfo(String info) {
+ this.diagnostics = info;
+ }
+
+ @Override
+ public String getStateString() {
+ return userStatusInfo;
+ }
+
+ @Override
+ public void setStateString(String userStatusInfo) {
+ this.userStatusInfo = userStatusInfo;
+ }
+
+ @Override
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ @Override
+ public long getShuffleFinishTime() {
+ return shuffleFinishTime;
+ }
+
+ @Override
+ public long getMapFinishTime() {
+ return mapFinishTime;
+ }
+
+ @Override
+ public long getSortFinishTime() {
+ return sortFinishTime;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public Phase getPhase() {
+ return phase;
+ }
+
+ @Override
+ public void setPhase(Phase phase) {
+ Phase oldPhase = getPhase();
+ if (oldPhase != phase) {
+ // sort phase started
+ if (phase == Phase.SORT){
+ if (oldPhase == Phase.MAP) {
+ setMapFinishTime(System.currentTimeMillis());
+ } else {
+ setShuffleFinishTime(System.currentTimeMillis());
+ }
+ } else if (phase == Phase.REDUCE) {
+ setSortFinishTime(System.currentTimeMillis());
+ }
+ this.phase = phase;
+ }
+ }
+
+ @Override
+ public TezCounters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public void setCounters(TezCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public long getLocalOutputSize() {
+ return localOutputSize;
+ }
+
+ @Override
+ public List<TezTaskAttemptID> getFailedDependencies() {
+ return failedTaskDependencies;
+ }
+
+ @Override
+ public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
+ failedTaskDependencies.add(taskAttemptId);
+ }
+
+ @Override
+ synchronized public void clearStatus() {
+ userStatusInfo = "";
+ failedTaskDependencies.clear();
+ }
+
+ @Override
+ synchronized public void statusUpdate(
+ float progress, String userDiagnosticInfo, TezCounters counters) {
+ setProgress(progress);
+ setDiagnosticInfo(userDiagnosticInfo);
+ setCounters(counters);
+ }
+
+ @Override
+ public void setOutputSize(long localOutputSize) {
+ this.localOutputSize = localOutputSize;
+ }
+
+ @Override
+ public void setFinishTime(long finishTime) {
+ if(this.getStartTime() > 0 && finishTime > 0) {
+ if (getShuffleFinishTime() == 0) {
+ setShuffleFinishTime(finishTime);
+ }
+ if (getSortFinishTime() == 0){
+ setSortFinishTime(finishTime);
+ }
+ if (getMapFinishTime() == 0) {
+ setMapFinishTime(finishTime);
+ }
+ this.finishTime = finishTime;
+ }
+ }
+
+ @Override
+ public void setShuffleFinishTime(long shuffleFinishTime) {
+ this.shuffleFinishTime = shuffleFinishTime;
+ }
+
+ @Override
+ public void setMapFinishTime(long mapFinishTime) {
+ this.mapFinishTime = mapFinishTime;
+ }
+
+ @Override
+ public void setSortFinishTime(long sortFinishTime) {
+ this.sortFinishTime = sortFinishTime;
+ if (getShuffleFinishTime() == this.shuffleFinishTime ){
+ setShuffleFinishTime(sortFinishTime);
+ }
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ WritableUtils.writeEnum(out, state);
+ out.writeFloat(progress);
+ WritableUtils.writeString(out, diagnostics);
+ WritableUtils.writeString(out, userStatusInfo);
+ WritableUtils.writeEnum(out, phase);
+
+ counters.write(out);
+
+ out.writeLong(localOutputSize);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ out.writeLong(sortFinishTime);
+ out.writeLong(mapFinishTime);
+ out.writeLong(shuffleFinishTime);
+
+ out.writeInt(failedTaskDependencies.size());
+ for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
+ taskAttemptId.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId = TezTaskAttemptID.read(in);
+ state = WritableUtils.readEnum(in, State.class);
+ progress = in.readFloat();
+ diagnostics = WritableUtils.readString(in);
+ userStatusInfo = WritableUtils.readString(in);
+ phase = WritableUtils.readEnum(in, Phase.class);
+ counters = new TezCounters();
+
+ counters.readFields(in);
+
+ localOutputSize = in.readLong();
+ startTime = in.readLong();
+ finishTime = in.readLong();
+ sortFinishTime = in.readLong();
+ mapFinishTime = in.readLong();
+ shuffleFinishTime = in.readLong();
+
+ int numFailedDependencies = in.readInt();
+ for (int i = 0 ; i < numFailedDependencies ; i++) {
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.read(in);
+ failedTaskDependencies.add(taskAttemptId);
+ }
+
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum MRTaskType {
+
+ MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP;
+
+ public String toString() {
+ switch (this) {
+ case MAP:
+ return "m";
+ case REDUCE:
+ return "r";
+ default:
+ return this.name();
+ }
+ }
+
+ public static MRTaskType fromString(String taskTypeString) {
+ if (taskTypeString.equals("m") || taskTypeString.equals(MRTaskType.MAP.toString())) {
+ return MRTaskType.MAP;
+ } else if (taskTypeString.equals("r") || taskTypeString.equals(MRTaskType.REDUCE.toString())) {
+ return MRTaskType.REDUCE;
+ } else {
+ return MRTaskType.valueOf(taskTypeString);
+ }
+ }
+
+ public String toSerializedString() {
+ return this.name();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+
+import com.google.common.base.Preconditions;
+
+public class MultiStageMRConfigUtil {
+
+ // TODO MRR FIXME based on conf format.
+ // Returns a complete conf object including non-intermediate stage conf.
+ public static Configuration getIntermediateStageConf(Configuration baseConf,
+ int i) {
+ String base = getPropertyNameForStage(i, "");
+ Configuration conf = new Configuration(false);
+ Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+ while (confEntries.hasNext()) {
+ Entry<String, String> entry = confEntries.next();
+ String key = entry.getKey();
+ if (key.startsWith(base)) {
+ conf.set(key.replace(base, ""), entry.getValue());
+ } else {
+ conf.set(key, entry.getValue());
+ }
+ }
+ return conf;
+ }
+
+ // TODO MRR FIXME based on conf format.
+ // Returns config settings specific to stage-i only.
+ public static Configuration getBasicIntermediateStageConf(
+ Configuration baseConf, int i) {
+ String base = getPropertyNameForStage(i, "");
+ Configuration conf = new Configuration(false);
+ Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+ while (confEntries.hasNext()) {
+ Entry<String, String> entry = confEntries.next();
+ String key = entry.getKey();
+ if (key.startsWith(base)) {
+ conf.set(key.replace(base, ""), entry.getValue());
+ }
+ }
+ return conf;
+ }
+
+ // TODO MRR FIXME based on conf format.
+ public static int getNumIntermediateStages(Configuration conf) {
+ return conf.getInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
+ }
+
+ // TODO MRR FIXME based on conf format.
+ public static String getPropertyNameForStage(int intermediateStage,
+ String originalPropertyName) {
+ return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
+ + originalPropertyName;
+ }
+
+ public static void main(String[] args) {
+ Configuration baseConf = new Configuration();
+ baseConf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
+ baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+ "mapreduce.job.combine.class"), IntSumReducer.class, Reducer.class);
+ baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+ "mapreduce.job.reduce.class"), IntSumReducer.class, Reducer.class);
+
+ Configuration conf = getBasicIntermediateStageConf(baseConf, 1);
+ printConf(conf);
+ }
+
+ private static String IREDUCE_PREFIX = "ireduce";
+
+ public static String getIntermediateReduceVertexName(int i) {
+ return "ireduce" + i;
+ }
+
+ public static boolean isIntermediateReduceStage(String vertexName) {
+ return vertexName.startsWith(IREDUCE_PREFIX);
+ }
+
+ public static int getIntermediateReduceStageNum(String vertexName) {
+ Preconditions.checkArgument(vertexName.startsWith(IREDUCE_PREFIX),
+ "IntermediateReduce vertex name must start with prefix: "
+ + IREDUCE_PREFIX);
+ String stageNumString = vertexName.substring(IREDUCE_PREFIX.length());
+ return Integer.valueOf(stageNumString);
+ }
+
+ // TODO Get rid of this. Temporary for testing.
+ public static void printConf(Configuration conf) {
+ Iterator<Entry<String, String>> confEntries = conf.iterator();
+ while (confEntries.hasNext()) {
+ Entry<String, String> entry = confEntries.next();
+ String key = entry.getKey();
+ String value = entry.getValue();
+ System.err.println("Key: " + key + ", Value: " + value);
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+/** Protocol that task child process uses to contact its parent process. The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process. All communication between child
+ * and parent is via this protocol. */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface TezTaskUmbilicalProtocol extends Master {
+
+ public static final long versionID = 19L;
+
+ ContainerTask getTask(ContainerContext containerContext) throws IOException;
+
+ boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
+
+ boolean ping(TezTaskAttemptID taskid) throws IOException;
+
+ void done(TezTaskAttemptID taskid) throws IOException;
+
+ void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+ void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ void fsError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
+
+ // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
+ // call.
+ // IAC outputReady followed by commit is a little confusing - since the output
+ // isn't really in place till a commit is called. Maybe rename to
+ // processingComplete or some such.
+
+ // TODO EVENTUALLY This is not the most useful API. Once there's some kind of
+ // support for the Task handing output over to the Container, this won't rally
+ // be required. i.e. InMemShuffle running as a service in the Container, or
+ // the second task in getTask(). ContainerUmbilical would include getTask and
+ // getServices...
+
+ void outputReady(TezTaskAttemptID taskAttemptId, OutputContext outputContext)
+ throws IOException;
+
+ ProceedToCompletionResponse
+ proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TezTypeConverters {
+
+ // Tez objects will be imported. Others will use the fully qualified name when
+ // required.
+ // All public methods named toYarn / toTez / toMapReduce
+
+ public static org.apache.hadoop.mapreduce.JobID toJobID(TezDAGID id) {
+ return new JobID(
+ String.valueOf(id.getApplicationId().getClusterTimestamp()), id.getId());
+
+ }
+
+ public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
+ Phase phase) {
+ return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
+ .name());
+ }
+
+ public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
+ TaskAttemptID mrTaskAttemptId = IDConverter
+ .toMRTaskAttemptId(taskAttemptId);
+ TaskAttemptId mrv2TaskAttemptId = TypeConverter.toYarn(mrTaskAttemptId);
+ return mrv2TaskAttemptId;
+ }
+
+ public static TezTaskAttemptID toTez(TaskAttemptId taskAttemptId) {
+ TaskAttemptID mrTaskAttemptId = TypeConverter.fromYarn(taskAttemptId);
+ TezTaskAttemptID tezTaskAttemptId = IDConverter
+ .fromMRTaskAttemptId(mrTaskAttemptId);
+ return tezTaskAttemptId;
+ }
+
+ public static TezDependentTaskCompletionEvent.Status toTez(
+ TaskAttemptCompletionEventStatus status) {
+ return TezDependentTaskCompletionEvent.Status.valueOf(status.toString());
+ }
+
+
+
+ public static Counters fromTez(TezCounters tezCounters) {
+ if (tezCounters == null) {
+ return null;
+ }
+ Counters counters = new Counters();
+ for (CounterGroup xGrp : tezCounters) {
+ counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
+ for (TezCounter xCounter : xGrp) {
+ Counter counter =
+ counters.findCounter(xGrp.getName(), xCounter.getName());
+ counter.setValue(xCounter.getValue());
+
+ }
+ }
+ return counters;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,65 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl
+ extends org.apache.hadoop.mapreduce.task.JobContextImpl
+ implements JobContext {
+ private JobConf job;
+ private Progressable progress;
+
+ public JobContextImpl(JobConf conf, TezDAGID dagId,
+ Progressable progress) {
+ super(conf, IDConverter.toMRJobId(dagId));
+ this.job = conf;
+ this.progress = progress;
+ }
+
+ public JobContextImpl(JobConf conf, TezDAGID dagId) {
+ this(conf, dagId, Reporter.NULL);
+ }
+
+ /**
+ * Get the job Configuration
+ *
+ * @return JobConf
+ */
+ public JobConf getJobConf() {
+ return job;
+ }
+
+ /**
+ * Get the progress mechanism for reporting progress.
+ *
+ * @return progress mechanism
+ */
+ public Progressable getProgressible() {
+ return progress;
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,312 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+
+public class MRCounters extends org.apache.hadoop.mapred.Counters {
+ private final org.apache.tez.common.counters.TezCounters raw;
+
+ public MRCounters(org.apache.tez.common.counters.TezCounters raw) {
+ this.raw = raw;
+ }
+
+ @Override
+ public synchronized org.apache.hadoop.mapred.Counters.Group getGroup(String groupName) {
+ return new MRCounterGroup(raw.getGroup(groupName));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized Collection<String> getGroupNames() {
+ return IteratorUtils.toList(raw.getGroupNames().iterator()); }
+
+ @Override
+ public synchronized String makeCompactString() {
+ StringBuilder builder = new StringBuilder();
+ boolean first = true;
+ for(Group group: this){
+ for(Counter counter: group) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(',');
+ }
+ builder.append(group.getDisplayName());
+ builder.append('.');
+ builder.append(counter.getDisplayName());
+ builder.append(':');
+ builder.append(counter.getCounter());
+ }
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public synchronized Counter findCounter(String group, String name) {
+ return new MRCounter(raw.findCounter(group, name));
+ }
+
+ @Override
+ public Counter findCounter(String group, int id, String name) {
+ return new MRCounter(raw.findCounter(group, name));
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ raw.findCounter(key).increment(amount);
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ raw.findCounter(group, counter).increment(amount);
+ }
+
+ @Override
+ public synchronized long getCounter(Enum<?> key) {
+ return raw.findCounter(key).getValue();
+ }
+
+ @Override
+ public synchronized void incrAllCounters(
+ org.apache.hadoop.mapred.Counters other) {
+ for (Group otherGroup: other) {
+ Group group = getGroup(otherGroup.getName());
+ group.setDisplayName(otherGroup.getDisplayName());
+ for (Counter otherCounter : otherGroup) {
+ Counter counter = group.getCounterForName(otherCounter.getName());
+ counter.setDisplayName(otherCounter.getDisplayName());
+ counter.increment(otherCounter.getValue());
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return countCounters();
+ }
+
+ @Override
+ public void log(Log log) {
+ log.info("Counters: " + size());
+ for(Group group: this) {
+ log.info(" " + group.getDisplayName());
+ for (Counter counter: group) {
+ log.info(" " + counter.getDisplayName() + "=" +
+ counter.getCounter());
+ }
+ }
+ }
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(this);
+ }
+
+ public static class MRCounterGroup extends org.apache.hadoop.mapred.Counters.Group {
+ private final org.apache.tez.common.counters.CounterGroup group;
+ public MRCounterGroup(org.apache.tez.common.counters.CounterGroup group) {
+ this.group = group;
+ }
+ @Override
+ public String getName() {
+ return group.getName();
+ }
+ @Override
+ public String getDisplayName() {
+ return group.getDisplayName();
+ }
+ @Override
+ public void setDisplayName(String displayName) {
+ group.setDisplayName(displayName);
+ }
+ @Override
+ public void addCounter(org.apache.hadoop.mapred.Counters.Counter counter) {
+ group.addCounter(convert(counter));
+ }
+ @Override
+ public org.apache.hadoop.mapred.Counters.Counter addCounter(String name,
+ String displayName, long value) {
+ return new MRCounter(group.addCounter(name, displayName, value));
+ }
+ @Override
+ public org.apache.hadoop.mapred.Counters.Counter findCounter(
+ String counterName, String displayName) {
+ return new MRCounter(group.findCounter(counterName, displayName));
+ }
+ @Override
+ public int size() {
+ return group.size();
+ }
+ @Override
+ public void incrAllCounters(
+ org.apache.hadoop.mapreduce.counters.CounterGroupBase rightGroup) {
+ new MRCounterGroup(group).incrAllCounters(rightGroup);
+ }
+ @Override
+ public org.apache.hadoop.mapreduce.counters.CounterGroupBase
+ getUnderlyingGroup() {
+ return new MRCounterGroup(group).getUnderlyingGroup();
+ }
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+ }
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+ }
+ @Override
+ public Iterator iterator() {
+ // FIXME?
+ return group.iterator();
+ }
+ }
+
+ public static class MRCounter extends Counter {
+ private final org.apache.tez.common.counters.TezCounter raw;
+
+ public MRCounter(org.apache.tez.common.counters.TezCounter raw) {
+ this.raw = raw;
+ }
+
+ @Override
+ public void setDisplayName(String displayName) {
+ // TODO Auto-generated method stub
+ raw.setDisplayName(displayName);
+ }
+
+ @Override
+ public String getName() {
+ return raw.getName();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return raw.getDisplayName();
+ }
+
+ @Override
+ public long getValue() {
+ return raw.getValue();
+ }
+
+ @Override
+ public void setValue(long value) {
+ raw.setValue(value);
+ }
+
+ @Override
+ public void increment(long incr) {
+ raw.increment(incr);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ raw.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ raw.readFields(in);
+ }
+
+ @Override
+ public String makeEscapedCompactString() {
+ return toEscapedCompactString(new MRCounter(raw));
+ }
+
+ @Deprecated
+ public boolean contentEquals(Counter counter) {
+ MRCounter c = new MRCounter(raw);
+ return c.equals(counter.getUnderlyingCounter());
+ }
+
+
+ @Override
+ public long getCounter() {
+ return raw.getValue();
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
+ return new MRCounter(raw).getUnderlyingCounter();
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ return raw.equals(genericRight);
+ }
+
+ @Override
+ public int hashCode() {
+ // TODO Auto-generated method stub
+ return raw.hashCode();
+ }
+ }
+
+ static org.apache.tez.common.counters.TezCounter convert(
+ org.apache.hadoop.mapred.Counters.Counter counter) {
+ org.apache.hadoop.mapreduce.Counter underlyingCounter =
+ counter.getUnderlyingCounter();
+ if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter) {
+ org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter
+ real =
+ (org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter)underlyingCounter;
+ return new org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter(
+ real.getKey(), real.getGroupName());
+ } else if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter) {
+ org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter real =
+ (org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter)underlyingCounter;
+ return new org.apache.tez.common.counters.FileSystemCounterGroup.FSCounter(
+ real.getScheme(), convert(real.getFileSystemCounter()));
+ } else {
+ return new org.apache.tez.common.counters.GenericCounter(
+ underlyingCounter.getName(),
+ underlyingCounter.getDisplayName(),
+ underlyingCounter.getValue());
+ }
+ }
+
+ static org.apache.tez.common.counters.FileSystemCounter convert(
+ org.apache.hadoop.mapreduce.FileSystemCounter c) {
+ switch (c) {
+ case BYTES_READ:
+ return org.apache.tez.common.counters.FileSystemCounter.BYTES_READ;
+ case BYTES_WRITTEN:
+ return org.apache.tez.common.counters.FileSystemCounter.BYTES_WRITTEN;
+ case READ_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.READ_OPS;
+ case LARGE_READ_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.LARGE_READ_OPS;
+ case WRITE_OPS:
+ return org.apache.tez.common.counters.FileSystemCounter.WRITE_OPS;
+ default:
+ throw new IllegalArgumentException("Unknow FileSystemCounter: " + c);
+ }
+
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,110 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+ extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+ implements TaskAttemptContext {
+ private MRTaskReporter reporter;
+
+ public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskid) {
+ this(conf, taskid, null);
+ }
+
+ // FIXME we need to use DAG Id but we are using App Id
+ public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskAttemptId,
+ MRTaskReporter reporter) {
+ super(conf,
+ new TaskAttemptID(
+ new TaskID(
+ Long.toString(taskAttemptId.getTaskID().getVertexID().
+ getDAGId().getApplicationId().getClusterTimestamp()),
+ taskAttemptId.getTaskID().getVertexID().getDAGId().
+ getApplicationId().getId(),
+ (taskAttemptId.getTaskID().getVertexID().getId() == 0 ?
+ TaskType.MAP : TaskType.REDUCE),
+ taskAttemptId.getTaskID().getId()),
+ taskAttemptId.getId()));
+ this.reporter = reporter;
+ }
+
+ /**
+ * Get the taskAttemptID.
+ *
+ * @return TaskAttemptID
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return (TaskAttemptID) super.getTaskAttemptID();
+ }
+
+ public Progressable getProgressible() {
+ return reporter;
+ }
+
+ public JobConf getJobConf() {
+ return (JobConf) getConfiguration();
+ }
+
+ @Override
+ public float getProgress() {
+ return reporter.getProgress();
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> counterName) {
+ return (Counter) reporter.getCounter(counterName);
+ }
+
+ @Override
+ public Counter getCounter(String groupName, String counterName) {
+ return (Counter) reporter.getCounter(groupName, counterName);
+ }
+
+ /**
+ * Report progress.
+ */
+ @Override
+ public void progress() {
+ reporter.progress();
+ }
+
+ /**
+ * Set the current status of the task to the given string.
+ */
+ @Override
+ public void setStatus(String status) {
+ setStatusString(status);
+ reporter.setStatus(status);
+ }
+
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl implements JobContext {
+
+ protected final org.apache.hadoop.mapred.JobConf conf;
+ private TezDAGID dagId;
+ /**
+ * The UserGroupInformation object that has a reference to the current user
+ */
+ protected UserGroupInformation ugi;
+ protected final Credentials credentials;
+ private Progressable progress;
+
+ public JobContextImpl(Configuration conf, TezDAGID dagId) {
+ this(conf, dagId, MRTaskReporter.NULL);
+ }
+
+ public JobContextImpl(Configuration conf, TezDAGID dagId, Progressable progress) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf)conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ this.dagId = dagId;
+ this.credentials = this.conf.getCredentials();
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.progress = progress;
+ }
+
+ /**
+ * Return the configuration for the job.
+ * @return the shared configuration object
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the unique ID for the job.
+ * @return the object with the job id
+ */
+ public JobID getJobID() {
+ return IDConverter.toMRJobId(dagId);
+ }
+
+ /**
+ * Set the JobID.
+ */
+ public void setJobID(JobID jobId) {
+ this.dagId = IDConverter.fromMRJobId(jobId);
+ }
+
+ /**
+ * Get configured the number of reduce tasks for this job. Defaults to
+ * <code>1</code>.
+ * @return the number of reduce tasks for this job.
+ */
+ public int getNumReduceTasks() {
+ return conf.getNumReduceTasks();
+ }
+
+ /**
+ * Get the current working directory for the default file system.
+ *
+ * @return the directory name.
+ */
+ public Path getWorkingDirectory() throws IOException {
+ return conf.getWorkingDirectory();
+ }
+
+ /**
+ * Get the key class for the job output data.
+ * @return the key class for the job output data.
+ */
+ public Class<?> getOutputKeyClass() {
+ return conf.getOutputKeyClass();
+ }
+
+ /**
+ * Get the value class for job outputs.
+ * @return the value class for job outputs.
+ */
+ public Class<?> getOutputValueClass() {
+ return conf.getOutputValueClass();
+ }
+
+ /**
+ * Get the key class for the map output data. If it is not set, use the
+ * (final) output key class. This allows the map output key class to be
+ * different than the final output key class.
+ * @return the map output key class.
+ */
+ public Class<?> getMapOutputKeyClass() {
+ return conf.getMapOutputKeyClass();
+ }
+
+ /**
+ * Get the value class for the map output data. If it is not set, use the
+ * (final) output value class This allows the map output value class to be
+ * different than the final output value class.
+ *
+ * @return the map output value class.
+ */
+ public Class<?> getMapOutputValueClass() {
+ return conf.getMapOutputValueClass();
+ }
+
+ /**
+ * Get the user-specified job name. This is only used to identify the
+ * job to the user.
+ *
+ * @return the job's name, defaulting to "".
+ */
+ public String getJobName() {
+ return conf.getJobName();
+ }
+
+ /**
+ * Get the {@link InputFormat} class for the job.
+ *
+ * @return the {@link InputFormat} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends InputFormat<?,?>> getInputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends InputFormat<?,?>>)
+ conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
+ }
+
+ /**
+ * Get the {@link Mapper} class for the job.
+ *
+ * @return the {@link Mapper} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Mapper<?,?,?,?>> getMapperClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Mapper<?,?,?,?>>)
+ conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+ }
+
+ /**
+ * Get the combiner class for the job.
+ *
+ * @return the combiner class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Reducer<?,?,?,?>>)
+ conf.getClass(COMBINE_CLASS_ATTR, null);
+ }
+
+ /**
+ * Get the {@link Reducer} class for the job.
+ *
+ * @return the {@link Reducer} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Reducer<?,?,?,?>> getReducerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Reducer<?,?,?,?>>)
+ conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+ }
+
+ /**
+ * Get the {@link OutputFormat} class for the job.
+ *
+ * @return the {@link OutputFormat} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ return (Class<? extends OutputFormat<?,?>>)
+ conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
+ }
+
+ /**
+ * Get the {@link Partitioner} class for the job.
+ *
+ * @return the {@link Partitioner} class for the job.
+ */
+ @SuppressWarnings("unchecked")
+ public Class<? extends Partitioner<?,?>> getPartitionerClass()
+ throws ClassNotFoundException {
+ return (Class<? extends Partitioner<?,?>>)
+ conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
+ }
+
+ /**
+ * Get the {@link RawComparator} comparator used to compare keys.
+ *
+ * @return the {@link RawComparator} comparator used to compare keys.
+ */
+ public RawComparator<?> getSortComparator() {
+ return conf.getOutputKeyComparator();
+ }
+
+ /**
+ * Get the pathname of the job's jar.
+ * @return the pathname
+ */
+ public String getJar() {
+ return conf.getJar();
+ }
+
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the reduce.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setGroupingComparatorClass(Class) for details.
+ */
+ public RawComparator<?> getGroupingComparator() {
+ return conf.getOutputValueGroupingComparator();
+ }
+
+ /**
+ * Get whether job-setup and job-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getJobSetupCleanupNeeded() {
+ return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+ }
+
+ /**
+ * Get whether task-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getTaskCleanupNeeded() {
+ return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
+ }
+
+ /**
+ * This method checks to see if symlinks are to be create for the
+ * localized cache files in the current working directory
+ * @return true if symlinks are to be created- else return false
+ */
+ public boolean getSymlink() {
+ return DistributedCache.getSymlink(conf);
+ }
+
+ /**
+ * Get the archive entries in classpath as an array of Path
+ */
+ public Path[] getArchiveClassPaths() {
+ return DistributedCache.getArchiveClassPaths(conf);
+ }
+
+ /**
+ * Get cache archives set in the Configuration
+ * @return A URI array of the caches set in the Configuration
+ * @throws IOException
+ */
+ public URI[] getCacheArchives() throws IOException {
+ return DistributedCache.getCacheArchives(conf);
+ }
+
+ /**
+ * Get cache files set in the Configuration
+ * @return A URI array of the files set in the Configuration
+ * @throws IOException
+ */
+
+ public URI[] getCacheFiles() throws IOException {
+ return DistributedCache.getCacheFiles(conf);
+ }
+
+ /**
+ * Return the path array of the localized caches
+ * @return A path array of localized caches
+ * @throws IOException
+ */
+ public Path[] getLocalCacheArchives()
+ throws IOException {
+ return DistributedCache.getLocalCacheArchives(conf);
+ }
+
+ /**
+ * Return the path array of the localized files
+ * @return A path array of localized files
+ * @throws IOException
+ */
+ public Path[] getLocalCacheFiles()
+ throws IOException {
+ return DistributedCache.getLocalCacheFiles(conf);
+ }
+
+ /**
+ * Get the file entries in classpath as an array of Path
+ */
+ public Path[] getFileClassPaths() {
+ return DistributedCache.getFileClassPaths(conf);
+ }
+
+ /**
+ * Get the timestamps of the archives. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getArchiveTimestamps() {
+ return DistributedCache.getArchiveTimestamps(conf);
+ }
+
+ /**
+ * Get the timestamps of the files. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getFileTimestamps() {
+ return DistributedCache.getFileTimestamps(conf);
+ }
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * map task, as specified by the <code>mapred.map.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per map task.
+ */
+ public int getMaxMapAttempts() {
+ return conf.getMaxMapAttempts();
+ }
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per reduce task.
+ */
+ public int getMaxReduceAttempts() {
+ return conf.getMaxReduceAttempts();
+ }
+
+ /**
+ * Get whether the task profiling is enabled.
+ * @return true if some tasks will be profiled
+ */
+ public boolean getProfileEnabled() {
+ return conf.getProfileEnabled();
+ }
+
+ /**
+ * Get the profiler configuration arguments.
+ *
+ * The default value for this property is
+ * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+ *
+ * @return the parameters to pass to the task child to configure profiling
+ */
+ public String getProfileParams() {
+ return conf.getProfileParams();
+ }
+
+ /**
+ * Get the range of maps or reduces to profile.
+ * @param isMap is the task a map?
+ * @return the task ranges
+ */
+ public IntegerRanges getProfileTaskRange(boolean isMap) {
+ return conf.getProfileTaskRange(isMap);
+ }
+
+ /**
+ * Get the reported username for this job.
+ *
+ * @return the username
+ */
+ public String getUser() {
+ return conf.getUser();
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ @Override
+ public JobConf getJobConf() {
+ return conf;
+ }
+
+ @Override
+ public Progressable getProgressible() {
+ return progress;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
------------------------------------------------------------------------------
svn:eol-style = native