You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:17 UTC
[03/20] TEZ-444. Rename *.new* packages back to what they should be,
remove dead code from the old packages - mapreduce module (part of
TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
deleted file mode 100644
index d71dba0..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import java.io.IOException;
-import java.net.URI;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskStatus.State;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.IDConverter;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.newmapred.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-//import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.task.impl.YarnOutputFiles;
-
-@SuppressWarnings("deprecation")
-public abstract class MRTask {
-
- static final Log LOG = LogFactory.getLog(MRTask.class);
-
- protected JobConf jobConf;
- protected JobContext jobContext;
- protected TaskAttemptContext taskAttemptContext;
- protected OutputCommitter committer;
-
- // Current counters
- transient TezCounters counters;
- protected GcTimeUpdater gcUpdater;
- private ResourceCalculatorProcessTree pTree;
- private long initCpuCumulativeTime = 0;
- protected TezProcessorContext processorContext;
- protected TaskAttemptID taskAttemptId;
- protected Progress progress = new Progress();
- protected SecretKey jobTokenSecret;
-
- boolean isMap;
-
- /* flag to track whether task is done */
- AtomicBoolean taskDone = new AtomicBoolean(false);
-
- /** Construct output file names so that, when an output directory listing is
- * sorted lexicographically, positions correspond to output partitions.*/
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- protected MRTaskReporter mrReporter;
- protected boolean useNewApi;
-
- /**
- * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
- */
- private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
- new HashMap<String, FileSystemStatisticUpdater>();
-
- public MRTask(boolean isMap) {
- this.isMap = isMap;
- }
-
- // TODO how to update progress
- public void initialize(TezProcessorContext context) throws IOException,
- InterruptedException {
-
- DeprecatedKeys.init();
-
- processorContext = context;
- counters = context.getCounters();
- this.taskAttemptId = new TaskAttemptID(
- new TaskID(
- Long.toString(context.getApplicationId().getClusterTimestamp()),
- context.getApplicationId().getId(),
- (isMap ? TaskType.MAP : TaskType.REDUCE),
- context.getTaskIndex()),
- context.getTaskAttemptNumber());
- // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
- // Output. Phase is MR specific.
- gcUpdater = new GcTimeUpdater(counters);
-
- byte[] userPayload = context.getUserPayload();
- Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
- if (conf instanceof JobConf) {
- this.jobConf = (JobConf)conf;
- } else {
- this.jobConf = new JobConf(conf);
- }
- jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
- taskAttemptId.toString());
- jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
- context.getDAGAttemptNumber());
-
- initResourceCalculatorPlugin();
-
- LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
-
- // TODO Post MRR
- // A single file per vertex will likely be a better solution. Does not
- // require translation - client can take care of this. Will work independent
- // of whether the configuration is for intermediate tasks or not. Has the
- // overhead of localizing multiple files per job - i.e. the client would
- // need to write these files to hdfs, add them as local resources per
- // vertex. A solution like this may be more practical once it's possible to
- // submit configuration parameters to the AM and effectively tasks via RPC.
-
- jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
-
- if (LOG.isDebugEnabled() && userPayload != null) {
- Iterator<Entry<String, String>> iter = jobConf.iterator();
- String taskIdStr = taskAttemptId.getTaskID().toString();
- while (iter.hasNext()) {
- Entry<String, String> confEntry = iter.next();
- LOG.debug("TaskConf Entry"
- + ", taskId=" + taskIdStr
- + ", key=" + confEntry.getKey()
- + ", value=" + confEntry.getValue());
- }
- }
-
- configureMRTask();
- }
-
- private void configureMRTask()
- throws IOException, InterruptedException {
-
- Credentials credentials = UserGroupInformation.getCurrentUser()
- .getCredentials();
- jobConf.setCredentials(credentials);
- // TODO Can this be avoided all together. Have the MRTezOutputCommitter use
- // the Tez parameter.
- // TODO This could be fetched from the env if YARN is setting it for all
- // Containers.
- // Set it in conf, so as to be able to be used the the OutputCommitter.
-
- jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
- MapOutputFile.class); // MR
-
- // Not needed. This is probably being set via the source/consumer meta
- Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
- if (jobToken != null) {
- // Will MR ever run without a job token.
- SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken
- .getPassword());
- this.jobTokenSecret = sk;
- } else {
- LOG.warn("No job token set");
- }
-
- configureLocalDirs();
-
- if (jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY) != null) {
- jobConf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
- jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY));
- }
-
- // Set up the DistributedCache related configs
- setupDistributedCacheConfig(jobConf);
- }
-
- private void configureLocalDirs() throws IOException {
- // TODO NEWTEZ Is most of this functionality required ?
- jobConf.setStrings(TezJobConfig.LOCAL_DIRS, processorContext.getWorkDirs());
- jobConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, System.getenv(Environment.PWD.name()));
-
- jobConf.setStrings(MRConfig.LOCAL_DIR, processorContext.getWorkDirs());
-
- LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
- Path workDir = null;
- // First, try to find the JOB_LOCAL_DIR on this host.
- try {
- workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
- } catch (DiskErrorException e) {
- // DiskErrorException means dir not found. If not found, it will
- // be created below.
- }
- if (workDir == null) {
- // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
- workDir = lDirAlloc.getLocalPathForWrite("work", jobConf);
- FileSystem lfs = FileSystem.getLocal(jobConf).getRaw();
- boolean madeDir = false;
- try {
- madeDir = lfs.mkdirs(workDir);
- } catch (FileAlreadyExistsException e) {
- // Since all tasks will be running in their own JVM, the race condition
- // exists where multiple tasks could be trying to create this directory
- // at the same time. If this task loses the race, it's okay because
- // the directory already exists.
- madeDir = true;
- workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
- }
- if (!madeDir) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- }
- // TODO NEWTEZ Is this required ?
- jobConf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
- jobConf.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
- }
-
- /**
- * Set up the DistributedCache related configs to make
- * {@link DistributedCache#getLocalCacheFiles(Configuration)} and
- * {@link DistributedCache#getLocalCacheArchives(Configuration)} working.
- *
- * @param job
- * @throws IOException
- */
- private static void setupDistributedCacheConfig(final JobConf job)
- throws IOException {
-
- String localWorkDir = (job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
- // ^ ^ all symlinks are created in the current work-dir
-
- // Update the configuration object with localized archives.
- URI[] cacheArchives = DistributedCache.getCacheArchives(job);
- if (cacheArchives != null) {
- List<String> localArchives = new ArrayList<String>();
- for (int i = 0; i < cacheArchives.length; ++i) {
- URI u = cacheArchives[i];
- Path p = new Path(u);
- Path name = new Path((null == u.getFragment()) ? p.getName()
- : u.getFragment());
- String linkName = name.toUri().getPath();
- localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
- }
- if (!localArchives.isEmpty()) {
- job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
- .arrayToString(localArchives.toArray(new String[localArchives
- .size()])));
- }
- }
-
- // Update the configuration object with localized files.
- URI[] cacheFiles = DistributedCache.getCacheFiles(job);
- if (cacheFiles != null) {
- List<String> localFiles = new ArrayList<String>();
- for (int i = 0; i < cacheFiles.length; ++i) {
- URI u = cacheFiles[i];
- Path p = new Path(u);
- Path name = new Path((null == u.getFragment()) ? p.getName()
- : u.getFragment());
- String linkName = name.toUri().getPath();
- localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
- }
- if (!localFiles.isEmpty()) {
- job.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
- .arrayToString(localFiles.toArray(new String[localFiles.size()])));
- }
- }
- }
-
-
- private void initResourceCalculatorPlugin() {
- Class<? extends ResourceCalculatorProcessTree> clazz =
- this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
- null, ResourceCalculatorProcessTree.class);
- pTree = ResourceCalculatorProcessTree
- .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
- LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
- if (pTree != null) {
- pTree.updateProcessTree();
- initCpuCumulativeTime = pTree.getCumulativeCpuTime();
- }
- }
-
- public TezProcessorContext getUmbilical() {
- return this.processorContext;
- }
-
- public void initTask() throws IOException,
- InterruptedException {
- this.mrReporter = new MRTaskReporter(processorContext);
- this.useNewApi = jobConf.getUseNewMapper();
- TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
- .getVertexID().getDAGId();
-
- this.jobContext = new JobContextImpl(jobConf, dagId, mrReporter);
- this.taskAttemptContext =
- new TaskAttemptContextImpl(jobConf, taskAttemptId, mrReporter);
-
- if (getState() == State.UNASSIGNED) {
- setState(State.RUNNING);
- }
-
-// combineProcessor = null;
-// boolean useCombiner = false;
-// if (useNewApi) {
-// try {
-// useCombiner = (taskAttemptContext.getCombinerClass() != null);
-// } catch (ClassNotFoundException e) {
-// throw new IOException("Could not find combiner class", e);
-// }
-// } else {
-// useCombiner = (job.getCombinerClass() != null);
-// }
-// if (useCombiner) {
-// combineProcessor = new MRCombiner(this);
-// combineProcessor.initialize(job, getTaskReporter());
-// } else {
-// }
-
- localizeConfiguration(jobConf);
- }
-
-// public void initPartitioner(JobConf job) throws IOException,
-// InterruptedException {
-// partitioner = new MRPartitioner(this);
-// ((MRPartitioner) partitioner).initialize(job, getTaskReporter());
-// }
-
- public MRTaskReporter getMRReporter() {
- return mrReporter;
- }
-
- public void setState(State state) {
- // TODO Auto-generated method stub
-
- }
-
- public State getState() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public OutputCommitter getCommitter() {
- return committer;
- }
-
- public void setCommitter(OutputCommitter committer) {
- this.committer = committer;
- }
-
- public TezCounters getCounters() { return counters; }
-
- public void setConf(JobConf jobConf) {
- this.jobConf = jobConf;
- }
-
- public JobConf getConf() {
- return this.jobConf;
- }
-
- /**
- * Gets a handle to the Statistics instance based on the scheme associated
- * with path.
- *
- * @param path the path.
- * @param conf the configuration to extract the scheme from if not part of
- * the path.
- * @return a Statistics instance, or null if none is found for the scheme.
- */
- @Private
- public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
- List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
- path = path.getFileSystem(conf).makeQualified(path);
- String scheme = path.toUri().getScheme();
- for (Statistics stats : FileSystem.getAllStatistics()) {
- if (stats.getScheme().equals(scheme)) {
- matchedStats.add(stats);
- }
- }
- return matchedStats;
- }
-
- @Private
- public synchronized String getOutputName() {
- return "part-" + NUMBER_FORMAT.format(taskAttemptId.getTaskID().getId());
- }
-
- public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
- InterruptedException {
- }
-
- public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
- throws IOException,
- InterruptedException {
- LOG.info("Task: " + taskAttemptId + " reporting outputReady");
- updateCounters();
- statusUpdate();
- }
-
- public void done(LogicalOutput output) throws IOException, InterruptedException {
- updateCounters();
-
- LOG.info("Task:" + taskAttemptId + " is done."
- + " And is in the process of committing");
- // TODO change this to use the new context
- // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
- if (output instanceof SimpleOutput) {
- SimpleOutput sOut = (SimpleOutput)output;
- if (sOut.isCommitRequired()) {
- //wait for commit approval and commit
- // TODO EVENTUALLY - Commit is not required for map tasks.
- // skip a couple of RPCs before exiting.
- commit(sOut);
- }
- }
- taskDone.set(true);
- // Make sure we send at least one set of counter increments. It's
- // ok to call updateCounters() in this thread after comm thread stopped.
- updateCounters();
- sendLastUpdate();
- //signal the tasktracker that we are done
- //sendDone(umbilical);
- }
-
- /**
- * Send a status update to the task tracker
- * @param umbilical
- * @throws IOException
- */
- public void statusUpdate() throws IOException, InterruptedException {
- // TODO call progress update here if not being called within Map/Reduce
- }
-
- /**
- * Sends last status update before sending umbilical.done();
- */
- private void sendLastUpdate()
- throws IOException, InterruptedException {
- statusUpdate();
- }
-
- private void commit(SimpleOutput output) throws IOException {
- int retries = 3;
- while (true) {
- // This will loop till the AM asks for the task to be killed. As
- // against, the AM sending a signal to the task to kill itself
- // gracefully.
- try {
- if (processorContext.canCommit()) {
- break;
- }
- Thread.sleep(1000);
- } catch(InterruptedException ie) {
- //ignore
- } catch (IOException ie) {
- LOG.warn("Failure sending canCommit: "
- + StringUtils.stringifyException(ie));
- if (--retries == 0) {
- throw ie;
- }
- }
- }
-
- // task can Commit now
- try {
- LOG.info("Task " + taskAttemptId + " is allowed to commit now");
- output.commit();
- return;
- } catch (IOException iee) {
- LOG.warn("Failure committing: " +
- StringUtils.stringifyException(iee));
- //if it couldn't commit a successfully then delete the output
- discardOutput(output);
- throw iee;
- }
- }
-
- private
- void discardOutput(SimpleOutput output) {
- try {
- output.abort();
- } catch (IOException ioe) {
- LOG.warn("Failure cleaning up: " +
- StringUtils.stringifyException(ioe));
- }
- }
-
-
- public void updateCounters() {
- // TODO Auto-generated method stub
- // TODO TEZAM Implement.
- Map<String, List<FileSystem.Statistics>> map = new
- HashMap<String, List<FileSystem.Statistics>>();
- for(Statistics stat: FileSystem.getAllStatistics()) {
- String uriScheme = stat.getScheme();
- if (map.containsKey(uriScheme)) {
- List<FileSystem.Statistics> list = map.get(uriScheme);
- list.add(stat);
- } else {
- List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
- list.add(stat);
- map.put(uriScheme, list);
- }
- }
- for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
- FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
- if(updater==null) {//new FileSystem has been found in the cache
- updater =
- new FileSystemStatisticUpdater(counters, entry.getValue(),
- entry.getKey());
- statisticUpdaters.put(entry.getKey(), updater);
- }
- updater.updateCounters();
- }
-
- gcUpdater.incrementGcCounter();
- updateResourceCounters();
- }
-
- /**
- * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
- * current total committed heap space usage of this JVM.
- */
- private void updateHeapUsageCounter() {
- long currentHeapUsage = Runtime.getRuntime().totalMemory();
- counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
- .setValue(currentHeapUsage);
- }
-
- /**
- * Update resource information counters
- */
- void updateResourceCounters() {
- // Update generic resource counters
- updateHeapUsageCounter();
-
- // Updating resources specified in ResourceCalculatorPlugin
- if (pTree == null) {
- return;
- }
- pTree.updateProcessTree();
- long cpuTime = pTree.getCumulativeCpuTime();
- long pMem = pTree.getCumulativeRssmem();
- long vMem = pTree.getCumulativeVmem();
- // Remove the CPU time consumed previously by JVM reuse
- cpuTime -= initCpuCumulativeTime;
- counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
- counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
- counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
- }
-
-
- public static String normalizeStatus(String status, Configuration conf) {
- // Check to see if the status string is too long
- // and truncate it if needed.
- int progressStatusLength = conf.getInt(
- MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
- MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
- if (status.length() > progressStatusLength) {
- LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
- + progressStatusLength + " characters)");
- status = status.substring(0, progressStatusLength);
- }
- return status;
- }
-
- protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
- org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
- createReduceContext(org.apache.hadoop.mapreduce.Reducer
- <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
- Configuration job,
- TaskAttemptID taskId,
- final TezRawKeyValueIterator rIter,
- org.apache.hadoop.mapreduce.Counter inputKeyCounter,
- org.apache.hadoop.mapreduce.Counter inputValueCounter,
- org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
- org.apache.hadoop.mapreduce.OutputCommitter committer,
- org.apache.hadoop.mapreduce.StatusReporter reporter,
- RawComparator<INKEY> comparator,
- Class<INKEY> keyClass, Class<INVALUE> valueClass
- ) throws IOException, InterruptedException {
- RawKeyValueIterator r =
- new RawKeyValueIterator() {
-
- @Override
- public boolean next() throws IOException {
- return rIter.next();
- }
-
- @Override
- public DataInputBuffer getValue() throws IOException {
- return rIter.getValue();
- }
-
- @Override
- public Progress getProgress() {
- return rIter.getProgress();
- }
-
- @Override
- public DataInputBuffer getKey() throws IOException {
- return rIter.getKey();
- }
-
- @Override
- public void close() throws IOException {
- rIter.close();
- }
- };
- org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
- reduceContext =
- new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
- job,
- taskId,
- r,
- inputKeyCounter,
- inputValueCounter,
- output,
- committer,
- reporter,
- comparator,
- keyClass,
- valueClass);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using key class: " + keyClass
- + ", valueClass: " + valueClass);
- }
-
- org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
- reducerContext =
- new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
- reduceContext);
-
- return reducerContext;
- }
-
- public void taskCleanup()
- throws IOException, InterruptedException {
- // set phase for this task
- statusUpdate();
- LOG.info("Runnning cleanup for the task");
- // do the cleanup
- committer.abortTask(taskAttemptContext);
- }
-
- public void localizeConfiguration(JobConf jobConf)
- throws IOException, InterruptedException {
- jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
- jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
- jobConf.setInt(JobContext.TASK_PARTITION,
- taskAttemptId.getTaskID().getId());
- jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
- }
-
- public abstract TezCounter getOutputRecordsCounter();
-
- public abstract TezCounter getInputRecordsCounter();
-
- public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
- return taskAttemptContext;
- }
-
- public JobContext getJobContext() {
- return jobContext;
- }
-
- public TaskAttemptID getTaskAttemptId() {
- return taskAttemptId;
- }
-
- public TezProcessorContext getTezEngineTaskContext() {
- return processorContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
deleted file mode 100644
index c7c9567..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class MRTaskReporter
- extends org.apache.hadoop.mapreduce.StatusReporter
- implements Reporter {
-
- private final TezTaskContext context;
- private final boolean isProcessorContext;
- private final Reporter reporter;
-
- private InputSplit split = null;
-
- public MRTaskReporter(TezProcessorContext context) {
- this.context = context;
- this.reporter = new MRReporter(context);
- this.isProcessorContext = true;
- }
-
- public MRTaskReporter(TezOutputContext context) {
- this.context = context;
- this.reporter = new MRReporter(context);
- this.isProcessorContext = false;
- }
-
- public MRTaskReporter(TezInputContext context) {
- this.context= context;
- this.reporter = new MRReporter(context);
- this.isProcessorContext = false;
- }
-
- public void setProgress(float progress) {
- if (isProcessorContext) {
- ((TezProcessorContext)context).setProgress(progress);
- } else {
- // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
- }
- }
-
- public void setStatus(String status) {
- reporter.setStatus(status);
- }
-
- public float getProgress() {
- return reporter.getProgress();
- };
-
- public void progress() {
- reporter.progress();
- }
-
- public Counters.Counter getCounter(String group, String name) {
- TezCounter counter = context.getCounters().findCounter(group, name);
- MRCounters.MRCounter mrCounter = null;
- if (counter != null) {
- mrCounter = new MRCounters.MRCounter(counter);
- }
- return mrCounter;
- }
-
- public Counters.Counter getCounter(Enum<?> name) {
- TezCounter counter = context.getCounters().findCounter(name);
- MRCounters.MRCounter mrCounter = null;
- if (counter != null) {
- mrCounter = new MRCounters.MRCounter(counter);
- }
- return mrCounter;
- }
-
- public void incrCounter(Enum<?> key, long amount) {
- reporter.incrCounter(key, amount);
- }
-
- public void incrCounter(String group, String counter, long amount) {
- reporter.incrCounter(group, counter, amount);
- }
-
- public void setInputSplit(InputSplit split) {
- this.split = split;
- }
-
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- if (split == null) {
- throw new UnsupportedOperationException("Input only available on map");
- } else {
- return split;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
deleted file mode 100644
index 21df743..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.newprocessor.map;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.MapRunnable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
-import org.apache.tez.engine.newapi.KVWriter;
-import org.apache.tez.engine.newapi.LogicalIOProcessor;
-import org.apache.tez.engine.newapi.LogicalInput;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.mapreduce.hadoop.newmapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.newinput.SimpleInput;
-import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-import org.apache.tez.mapreduce.newprocessor.MRTask;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class MapProcessor extends MRTask implements LogicalIOProcessor {
-
- private static final Log LOG = LogFactory.getLog(MapProcessor.class);
-
- public MapProcessor(){
- super(true);
- }
-
- @Override
- public void initialize(TezProcessorContext processorContext)
- throws IOException {
- try {
- super.initialize(processorContext);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
- // TODO Auto-generated method stub
-
- }
-
- public void close() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
-
- LOG.info("Running map: " + processorContext.getUniqueIdentifier());
-
- initTask();
-
- if (inputs.size() != 1
- || outputs.size() != 1) {
- throw new IOException("Cannot handle multiple inputs or outputs"
- + ", inputCount=" + inputs.size()
- + ", outputCount=" + outputs.size());
- }
- LogicalInput in = inputs.values().iterator().next();
- LogicalOutput out = outputs.values().iterator().next();
-
- // Sanity check
- if (!(in instanceof SimpleInputLegacy)) {
- throw new IOException(new TezException(
- "Only Simple Input supported. Input: " + in.getClass()));
- }
- SimpleInputLegacy input = (SimpleInputLegacy)in;
-
- KVWriter kvWriter = null;
- if (!(out instanceof OnFileSortedOutput)) {
- kvWriter = ((SimpleOutput)out).getWriter();
- } else {
- kvWriter = ((OnFileSortedOutput)out).getWriter();
- }
-
- if (useNewApi) {
- runNewMapper(jobConf, mrReporter, input, kvWriter);
- } else {
- runOldMapper(jobConf, mrReporter, input, kvWriter);
- }
-
- done(out);
- }
-
- void runOldMapper(
- final JobConf job,
- final MRTaskReporter reporter,
- final SimpleInputLegacy input,
- final KVWriter output
- ) throws IOException, InterruptedException {
-
- // Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
- //input.initialize(job, master);
-
- RecordReader in = new OldRecordReader(input);
-
- OutputCollector collector = new OldOutputCollector(output);
-
- MapRunnable runner =
- (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
-
- runner.run(in, collector, (Reporter)reporter);
- // start the sort phase only if there are reducers
- this.statusUpdate();
- }
-
- private void runNewMapper(final JobConf job,
- MRTaskReporter reporter,
- final SimpleInputLegacy in,
- KVWriter out
- ) throws IOException, InterruptedException {
-
- // Initialize input in-line since it sets parameters which may be used by the processor.
- // Done only for SimpleInput.
- // TODO use new method in SimpleInput to get required info
- //in.initialize(job, master);
-
- // make a task context so we can get the classes
- org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- getTaskAttemptContext();
-
- // make a mapper
- org.apache.hadoop.mapreduce.Mapper mapper;
- try {
- mapper = (org.apache.hadoop.mapreduce.Mapper)
- ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
-
- org.apache.hadoop.mapreduce.RecordReader input =
- new NewRecordReader(in);
-
- org.apache.hadoop.mapreduce.RecordWriter output =
- new NewOutputCollector(out);
-
- org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
-
- org.apache.hadoop.mapreduce.MapContext
- mapContext =
- new MapContextImpl(
- job, taskAttemptId,
- input, output,
- getCommitter(),
- processorContext, split);
-
- org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
- new WrappedMapper().getMapContext(mapContext);
-
- input.initialize(split, mapperContext);
- mapper.run(mapperContext);
- this.statusUpdate();
- input.close();
- output.close(mapperContext);
- }
-
- private static class NewRecordReader extends
- org.apache.hadoop.mapreduce.RecordReader {
- private final SimpleInput in;
- private KVReader reader;
-
- private NewRecordReader(SimpleInput in) throws IOException {
- this.in = in;
- this.reader = in.getReader();
- }
-
- @Override
- public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
- TaskAttemptContext context) throws IOException,
- InterruptedException {
- //in.initializeNewRecordReader(split, context);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException,
- InterruptedException {
- return reader.next();
- }
-
- @Override
- public Object getCurrentKey() throws IOException,
- InterruptedException {
- return reader.getCurrentKV().getKey();
- }
-
- @Override
- public Object getCurrentValue() throws IOException,
- InterruptedException {
- return reader.getCurrentKV().getValues().iterator().next();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return in.getProgress();
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-
- private static class OldRecordReader implements RecordReader {
- private final SimpleInputLegacy simpleInput;
-
- private OldRecordReader(SimpleInputLegacy simpleInput) {
- this.simpleInput = simpleInput;
- }
-
- @Override
- public boolean next(Object key, Object value) throws IOException {
- // TODO broken
-// simpleInput.setKey(key);
-// simpleInput.setValue(value);
-// try {
-// return simpleInput.hasNext();
-// } catch (InterruptedException ie) {
-// throw new IOException(ie);
-// }
- return simpleInput.getOldRecordReader().next(key, value);
- }
-
- @Override
- public Object createKey() {
- return simpleInput.getOldRecordReader().createKey();
- }
-
- @Override
- public Object createValue() {
- return simpleInput.getOldRecordReader().createValue();
- }
-
- @Override
- public long getPos() throws IOException {
- return simpleInput.getOldRecordReader().getPos();
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public float getProgress() throws IOException {
- try {
- return simpleInput.getProgress();
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- }
- }
-
- private static class OldOutputCollector
- implements OutputCollector {
- private final KVWriter output;
-
- OldOutputCollector(KVWriter output) {
- this.output = output;
- }
-
- public void collect(Object key, Object value) throws IOException {
- output.write(key, value);
- }
- }
-
- private class NewOutputCollector
- extends org.apache.hadoop.mapreduce.RecordWriter {
- private final KVWriter out;
-
- NewOutputCollector(KVWriter out) throws IOException {
- this.out = out;
- }
-
- @Override
- public void write(Object key, Object value) throws IOException, InterruptedException {
- out.write(key, value);
- }
-
- @Override
- public void close(TaskAttemptContext context
- ) throws IOException, InterruptedException {
- }
- }
-
- @Override
- public void localizeConfiguration(JobConf jobConf)
- throws IOException, InterruptedException {
- super.localizeConfiguration(jobConf);
- jobConf.setBoolean(JobContext.TASK_ISMAP, true);
- }
-
- @Override
- public TezCounter getOutputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
- }
-
- @Override
- public TezCounter getInputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
deleted file mode 100644
index cedcdd6..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.newprocessor.reduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
-import org.apache.tez.engine.newapi.KVWriter;
-import org.apache.tez.engine.newapi.LogicalIOProcessor;
-import org.apache.tez.engine.newapi.LogicalInput;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.mapreduce.newinput.ShuffledMergedInputLegacy;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-import org.apache.tez.mapreduce.newprocessor.MRTask;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class ReduceProcessor
-extends MRTask
-implements LogicalIOProcessor {
-
- private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
-
- private Counter reduceInputKeyCounter;
- private Counter reduceInputValueCounter;
-
- public ReduceProcessor() {
- super(false);
- }
-
- @Override
- public void initialize(TezProcessorContext processorContext)
- throws IOException {
- try {
- super.initialize(processorContext);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public void handleEvents(List<Event> processorEvents) {
- // TODO Auto-generated method stub
-
- }
-
- public void close() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void run(Map<String, LogicalInput> inputs,
- Map<String, LogicalOutput> outputs) throws Exception {
-
- LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
-
- initTask();
-
- if (outputs.size() <= 0 || outputs.size() > 1) {
- throw new IOException("Invalid number of outputs"
- + ", outputCount=" + outputs.size());
- }
-
- if (inputs.size() <= 0 || inputs.size() > 1) {
- throw new IOException("Invalid number of inputs"
- + ", inputCount=" + inputs.size());
- }
-
- LogicalInput in = inputs.values().iterator().next();
- LogicalOutput out = outputs.values().iterator().next();
-
- this.statusUpdate();
-
- Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
- Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
- LOG.info("Using keyClass: " + keyClass);
- LOG.info("Using valueClass: " + valueClass);
- RawComparator comparator =
- ConfigUtils.getInputKeySecondaryGroupingComparator(jobConf);
- LOG.info("Using comparator: " + comparator);
-
- reduceInputKeyCounter =
- mrReporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
- reduceInputValueCounter =
- mrReporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
-
- // Sanity check
- if (!(in instanceof ShuffledMergedInputLegacy)) {
- throw new IOException("Illegal input to reduce: " + in.getClass());
- }
- ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
- KVReader kvReader = shuffleInput.getReader();
-
- KVWriter kvWriter = null;
- if((out instanceof SimpleOutput)) {
- kvWriter = ((SimpleOutput) out).getWriter();
- } else if ((out instanceof OnFileSortedOutput)) {
- kvWriter = ((OnFileSortedOutput) out).getWriter();
- } else {
- throw new IOException("Illegal input to reduce: " + in.getClass());
- }
-
- if (useNewApi) {
- try {
- runNewReducer(
- jobConf,
- mrReporter,
- shuffleInput, comparator, keyClass, valueClass,
- kvWriter);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
- } else {
- runOldReducer(
- jobConf, mrReporter,
- kvReader, comparator, keyClass, valueClass, kvWriter);
- }
-
- done(out);
- }
-
- void runOldReducer(JobConf job,
- final MRTaskReporter reporter,
- KVReader input,
- RawComparator comparator,
- Class keyClass,
- Class valueClass,
- final KVWriter output) throws IOException, InterruptedException {
-
- Reducer reducer =
- ReflectionUtils.newInstance(job.getReducerClass(), job);
-
- // make output collector
-
- OutputCollector collector =
- new OutputCollector() {
- public void collect(Object key, Object value)
- throws IOException {
- output.write(key, value);
- }
- };
-
- // apply reduce function
- try {
- ReduceValuesIterator values =
- new ReduceValuesIterator(
- input, reporter, reduceInputValueCounter);
-
- values.informReduceProgress();
- while (values.more()) {
- reduceInputKeyCounter.increment(1);
- reducer.reduce(values.getKey(), values, collector, reporter);
- values.informReduceProgress();
- }
-
- //Clean up: repeated in catch block below
- reducer.close();
- //End of clean up.
- } catch (IOException ioe) {
- try {
- reducer.close();
- } catch (IOException ignored) {
- }
-
- throw ioe;
- }
- }
-
- private static class ReduceValuesIterator<KEY,VALUE>
- implements Iterator<VALUE> {
- private Counter reduceInputValueCounter;
- private KVReader in;
- private Progressable reporter;
- private Object currentKey;
- private Iterator<Object> currentValues;
-
- public ReduceValuesIterator (KVReader in,
- Progressable reporter,
- Counter reduceInputValueCounter)
- throws IOException {
- this.reduceInputValueCounter = reduceInputValueCounter;
- this.in = in;
- this.reporter = reporter;
- }
-
- public boolean more() throws IOException {
- boolean more = in.next();
- if(more) {
- currentKey = in.getCurrentKV().getKey();
- currentValues = in.getCurrentKV().getValues().iterator();
- } else {
- currentKey = null;
- currentValues = null;
- }
- return more;
- }
-
- public KEY getKey() throws IOException {
- return (KEY) currentKey;
- }
-
- public void informReduceProgress() {
- reporter.progress();
- }
-
- @Override
- public boolean hasNext() {
- return currentValues.hasNext();
- }
-
- @Override
- public VALUE next() {
- reduceInputValueCounter.increment(1);
- return (VALUE) currentValues.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-
- void runNewReducer(JobConf job,
- final MRTaskReporter reporter,
- ShuffledMergedInputLegacy input,
- RawComparator comparator,
- Class keyClass,
- Class valueClass,
- final KVWriter out
- ) throws IOException,InterruptedException,
- ClassNotFoundException {
-
- // make a task context so we can get the classes
- org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = getTaskAttemptContext();
-
- // make a reducer
- org.apache.hadoop.mapreduce.Reducer reducer =
- (org.apache.hadoop.mapreduce.Reducer)
- ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
-
- // wrap value iterator to report progress.
- final TezRawKeyValueIterator rawIter = input.getIterator();
- TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
- public void close() throws IOException {
- rawIter.close();
- }
- public DataInputBuffer getKey() throws IOException {
- return rawIter.getKey();
- }
- public Progress getProgress() {
- return rawIter.getProgress();
- }
- public DataInputBuffer getValue() throws IOException {
- return rawIter.getValue();
- }
- public boolean next() throws IOException {
- boolean ret = rawIter.next();
- reporter.setProgress(rawIter.getProgress().getProgress());
- return ret;
- }
- };
-
- org.apache.hadoop.mapreduce.RecordWriter trackedRW =
- new org.apache.hadoop.mapreduce.RecordWriter() {
-
- @Override
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- out.write(key, value);
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException,
- InterruptedException {
- }
- };
-
- org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
- createReduceContext(
- reducer, job, taskAttemptId,
- rIter, reduceInputKeyCounter,
- reduceInputValueCounter,
- trackedRW,
- committer,
- reporter, comparator, keyClass,
- valueClass);
-
-
-
- reducer.run(reducerContext);
- trackedRW.close(reducerContext);
- }
-
- @Override
- public void localizeConfiguration(JobConf jobConf)
- throws IOException, InterruptedException {
- super.localizeConfiguration(jobConf);
- jobConf.setBoolean(JobContext.TASK_ISMAP, false);
- }
-
- @Override
- public TezCounter getOutputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
- }
-
- @Override
- public TezCounter getInputRecordsCounter() {
- return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
index 060e28c..91fb8cc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
@@ -1,181 +1,226 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.tez.mapreduce.output;
import java.io.IOException;
+import java.text.NumberFormat;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
-/**
- * {@link SimpleOutput} is an {@link Output} which persists key/value pairs
- * written to it.
- *
- * It is compatible with all standard Apache Hadoop MapReduce
- * {@link OutputFormat} implementations.
- */
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class SimpleOutput implements Output {
-
- private MRTask task;
-
+public class SimpleOutput implements LogicalOutput {
+
+ private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private TezOutputContext outputContext;
+ private JobConf jobConf;
boolean useNewApi;
- JobConf jobConf;
-
- org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ @SuppressWarnings("rawtypes")
org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+ @SuppressWarnings("rawtypes")
org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
-
+
+ @SuppressWarnings("rawtypes")
org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+ @SuppressWarnings("rawtypes")
org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
-
+
private TezCounter outputRecordCounter;
- private TezCounter fileOutputByteCounter;
+ private TezCounter fileOutputByteCounter;
private List<Statistics> fsStats;
- private MRTaskReporter reporter;
-
- public SimpleOutput(TezEngineTaskContext task)
- {}
-
- public void setTask(MRTask task) {
- this.task = task;
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- if (task == null) {
- return;
- }
-
- if (conf instanceof JobConf) {
- jobConf = (JobConf)conf;
- } else {
- jobConf = new JobConf(conf);
- }
-
- useNewApi = jobConf.getUseNewMapper();
- taskAttemptContext = task.getTaskAttemptContext();
-
- outputRecordCounter = task.getOutputRecordsCounter();
- fileOutputByteCounter = task.getFileOutputBytesCounter();
-
- reporter = task.getMRReporter();
-
+ private TaskAttemptContext newApiTaskAttemptContext;
+ private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
+
+ private boolean isMapperOutput;
+
+ private OutputCommitter committer;
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException, InterruptedException {
+ LOG.info("Initializing Simple Output");
+ this.outputContext = outputContext;
+ Configuration conf = TezUtils.createConfFromUserPayload(
+ outputContext.getUserPayload());
+ this.jobConf = new JobConf(conf);
+ this.useNewApi = this.jobConf.getUseNewMapper();
+ this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+ false);
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ outputContext.getDAGAttemptNumber());
+
+ outputRecordCounter = outputContext.getCounters().findCounter(
+ TaskCounter.MAP_OUTPUT_RECORDS);
+ fileOutputByteCounter = outputContext.getCounters().findCounter(
+ FileOutputFormatCounter.BYTES_WRITTEN);
+
if (useNewApi) {
+ newApiTaskAttemptContext = createTaskAttemptContext();
try {
newOutputFormat =
ReflectionUtils.newInstance(
- taskAttemptContext.getOutputFormatClass(), jobConf);
+ newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
-
+
List<Statistics> matchedStats = null;
- if (newOutputFormat instanceof
+ if (newOutputFormat instanceof
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
- matchedStats =
- MRTask.getFsStatistics(
+ matchedStats =
+ Utils.getFsStatistics(
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
- .getOutputPath(taskAttemptContext),
+ .getOutputPath(newApiTaskAttemptContext),
jobConf);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes();
- newRecordWriter =
- newOutputFormat.getRecordWriter(this.taskAttemptContext);
+ try {
+ newRecordWriter =
+ newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record writer", e);
+ }
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
} else {
+ TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+ outputContext.getApplicationId().getClusterTimestamp()),
+ outputContext.getApplicationId().getId(),
+ (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+ outputContext.getTaskIndex()),
+ outputContext.getTaskAttemptNumber());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+ jobConf.setInt(JobContext.TASK_PARTITION,
+ taskAttemptId.getTaskID().getId());
+ jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
+ oldApiTaskAttemptContext =
+ new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
+ jobConf, taskAttemptId,
+ new MRTaskReporter(outputContext));
oldOutputFormat = jobConf.getOutputFormat();
-
+
List<Statistics> matchedStats = null;
- if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
- matchedStats =
- MRTask.getFsStatistics(
+ if (oldOutputFormat
+ instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+ matchedStats =
+ Utils.getFsStatistics(
org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
- jobConf),
+ jobConf),
jobConf);
}
fsStats = matchedStats;
FileSystem fs = FileSystem.get(jobConf);
- String finalName = task.getOutputName();
+ String finalName = getOutputName();
long bytesOutPrev = getOutputBytes();
- oldRecordWriter =
+ oldRecordWriter =
oldOutputFormat.getRecordWriter(
- fs, jobConf, finalName, reporter);
+ fs, jobConf, finalName, new MRReporter(outputContext));
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
+ initCommitter(jobConf, useNewApi);
+
+ LOG.info("Initialized Simple Output"
+ + ", using_new_api: " + useNewApi);
+ return null;
}
-
- public void write(Object key, Object value)
+
+ public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
- reporter.progress();
- long bytesOutPrev = getOutputBytes();
-
if (useNewApi) {
- newRecordWriter.write(key, value);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("using new api for output committer");
+ }
+
+ OutputFormat<?, ?> outputFormat = null;
+ try {
+ outputFormat = ReflectionUtils.newInstance(
+ newApiTaskAttemptContext.getOutputFormatClass(), job);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Unknown OutputFormat", cnfe);
+ }
+ this.committer = outputFormat.getOutputCommitter(
+ newApiTaskAttemptContext);
} else {
- oldRecordWriter.write(key, value);
+ this.committer = job.getOutputCommitter();
}
-
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- outputRecordCounter.increment(1);
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ if (outputPath != null) {
+ if ((this.committer instanceof FileOutputCommitter)) {
+ FileOutputFormat.setWorkOutputPath(job,
+ ((FileOutputCommitter) this.committer).getTaskAttemptPath(
+ oldApiTaskAttemptContext));
+ } else {
+ FileOutputFormat.setWorkOutputPath(job, outputPath);
+ }
+ }
+ if (useNewApi) {
+ this.committer.setupTask(newApiTaskAttemptContext);
+ } else {
+ this.committer.setupTask(oldApiTaskAttemptContext);
+ }
}
- public void close() throws IOException, InterruptedException {
- reporter.progress();
- long bytesOutPrev = getOutputBytes();
+ public boolean isCommitRequired() throws IOException {
if (useNewApi) {
- newRecordWriter.close(taskAttemptContext);
+ return committer.needsTaskCommit(newApiTaskAttemptContext);
} else {
- oldRecordWriter.close(null);
+ return committer.needsTaskCommit(oldApiTaskAttemptContext);
}
- long bytesOutCurr = getOutputBytes();
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
- public org.apache.hadoop.mapreduce.OutputFormat getNewOutputFormat() {
- return newOutputFormat;
- }
-
- public org.apache.hadoop.mapred.OutputFormat getOldOutputFormat() {
- return oldOutputFormat;
+ private TaskAttemptContext createTaskAttemptContext() {
+ return new TaskAttemptContextImpl(this.jobConf, outputContext,
+ isMapperOutput);
}
-
+
private long getOutputBytes() {
if (fsStats == null) return 0;
long bytesWritten = 0;
@@ -185,9 +230,97 @@ public class SimpleOutput implements Output {
return bytesWritten;
}
+ private String getOutputName() {
+ return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+ }
+
+ @Override
+ public KVWriter getWriter() throws IOException {
+ return new KVWriter() {
+ private final boolean useNewWriter = useNewApi;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ long bytesOutPrev = getOutputBytes();
+ if (useNewWriter) {
+ try {
+ newRecordWriter.write(key, value);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing next key-value",e);
+ }
+ } else {
+ oldRecordWriter.write(key, value);
+ }
+
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ outputRecordCounter.increment(1);
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // Not expecting any events at the moment.
+ }
+
@Override
- public OutputContext getOutputContext() {
+ public synchronized List<Event> close() throws IOException {
+ if (closed.getAndSet(true)) {
+ return null;
+ }
+
+ LOG.info("Closing Simple Output");
+ long bytesOutPrev = getOutputBytes();
+ if (useNewApi) {
+ try {
+ newRecordWriter.close(newApiTaskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing record writer", e);
+ }
+ } else {
+ oldRecordWriter.close(null);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ LOG.info("Closed Simple Output");
return null;
}
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ // Nothing to do for now
+ }
+
+ /**
+ * SimpleOutput expects that a Processor call commit prior to the
+ * Processor's completion
+ * @throws IOException
+ */
+ public void commit() throws IOException {
+ close();
+ if (useNewApi) {
+ committer.commitTask(newApiTaskAttemptContext);
+ } else {
+ committer.commitTask(oldApiTaskAttemptContext);
+ }
+ }
+
+
+ /**
+ * SimpleOutput expects that a Processor call abort in case of any error
+ * ( including an error during commit ) prior to the Processor's completion
+ * @throws IOException
+ */
+ public void abort() throws IOException {
+ close();
+ if (useNewApi) {
+ committer.abortTask(newApiTaskAttemptContext);
+ } else {
+ committer.abortTask(oldApiTaskAttemptContext);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 61dfcd1..d061ad5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -1,70 +1,54 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tez.mapreduce.partition;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
static final Log LOG = LogFactory.getLog(MRPartitioner.class);
- private final MRTask task;
-
- JobConf jobConf;
- boolean useNewApi;
-
- org.apache.hadoop.mapred.Partitioner oldPartitioner;
- org.apache.hadoop.mapreduce.Partitioner newPartitioner;
- public MRPartitioner(MRTask task) {
- this.task = task;
- }
-
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- if (conf instanceof JobConf) {
- jobConf = (JobConf)conf;
- } else {
- jobConf = new JobConf(conf);
- }
-
- useNewApi = jobConf.getUseNewMapper();
- final int partitions = this.task.getTezEngineTaskContext()
- .getOutputSpecList().get(0).getNumOutputs();
+ private final boolean useNewApi;
+ private int partitions = 1;
+
+ private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+ private org.apache.hadoop.mapred.Partitioner oldPartitioner;
+
+ public MRPartitioner(Configuration conf) {
+ this.useNewApi = ConfigUtils.useNewApi(conf);
+ this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
+
if (useNewApi) {
if (partitions > 1) {
- try {
- newPartitioner = (org.apache.hadoop.mapreduce.Partitioner)
- ReflectionUtils.newInstance(
- task.getJobContext().getPartitionerClass(), jobConf);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
+ newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
+ .newInstance(
+ (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+ .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
} else {
newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
@Override
@@ -75,24 +59,24 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
}
} else {
if (partitions > 1) {
- oldPartitioner = (Partitioner)
- ReflectionUtils.newInstance(jobConf.getPartitionerClass(), jobConf);
+ oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+ (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
+ "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class), conf);
} else {
- oldPartitioner = new Partitioner() {
+ oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
@Override
- public void configure(JobConf job) {}
-
+ public void configure(JobConf job) {
+ }
+
@Override
public int getPartition(Object key, Object value, int numPartitions) {
return numPartitions - 1;
}
};
}
-
}
-
}
-
+
@Override
public int getPartition(Object key, Object value, int numPartitions) {
if (useNewApi) {
@@ -101,5 +85,4 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
return oldPartitioner.getPartition(key, value, numPartitions);
}
}
-
-}
+}
\ No newline at end of file