You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [34/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MRTaskReporter
+ extends org.apache.hadoop.mapreduce.StatusReporter
+ implements Reporter {
+
+ private final TezTaskReporterImpl reporter;
+
+ private InputSplit split = null;
+
+ public MRTaskReporter(TezTaskReporter reporter) {
+ this.reporter = (TezTaskReporterImpl)reporter;
+ }
+
+ // getters and setters for flag
+ void setProgressFlag() {
+ reporter.setProgressFlag();
+ }
+ boolean resetProgressFlag() {
+ return reporter.resetProgressFlag();
+ }
+ public void setStatus(String status) {
+ reporter.setStatus(status);
+ }
+ public void setProgress(float progress) {
+ reporter.setProgress(progress);
+ }
+
+ public float getProgress() {
+ return reporter.getProgress();
+ };
+
+ public void progress() {
+ reporter.progress();
+ }
+
+ public Counters.Counter getCounter(String group, String name) {
+ TezCounter counter = reporter.getCounter(group, name);
+ MRCounters.MRCounter mrCounter = null;
+ if (counter != null) {
+ mrCounter = new MRCounters.MRCounter(counter);
+ }
+ return mrCounter;
+ }
+
+ public Counters.Counter getCounter(Enum<?> name) {
+ TezCounter counter = reporter.getCounter(name);
+ MRCounters.MRCounter mrCounter = null;
+ if (counter != null) {
+ mrCounter = new MRCounters.MRCounter(counter);
+ }
+ return mrCounter;
+ }
+
+ public void incrCounter(Enum<?> key, long amount) {
+ reporter.incrCounter(key, amount);
+ }
+
+ public void incrCounter(String group, String counter, long amount) {
+ reporter.incrCounter(group, counter, amount);
+ }
+
+ public void setInputSplit(InputSplit split) {
+ this.split = split;
+ }
+
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ if (split == null) {
+ throw new UnsupportedOperationException("Input only available on map");
+ } else {
+ return split;
+ }
+ }
+
+ public void startCommunicationThread() {
+ reporter.startCommunicationThread();
+ }
+
+ public void stopCommunicationThread() throws InterruptedException {
+ reporter.stopCommunicationThread();
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class TezTaskReporterImpl
+ implements org.apache.tez.common.TezTaskReporter, Runnable {
+
+ private static final Log LOG = LogFactory.getLog(TezTaskReporterImpl.class);
+
+ private final MRTask mrTask;
+ private final TezTaskUmbilicalProtocol umbilical;
+ private final Progress taskProgress;
+
+ private Thread pingThread = null;
+ private boolean done = true;
+ private Object lock = new Object();
+
+ /**
+ * flag that indicates whether progress update needs to be sent to parent.
+ * If true, it has been set. If false, it has been reset.
+ * Using AtomicBoolean since we need an atomic read & reset method.
+ */
+ private AtomicBoolean progressFlag = new AtomicBoolean(false);
+
+ TezTaskReporterImpl(MRTask mrTask, TezTaskUmbilicalProtocol umbilical) {
+ this.mrTask = mrTask;
+ this.umbilical = umbilical;
+ this.taskProgress = mrTask.getProgress();
+ }
+
+ // getters and setters for flag
+ void setProgressFlag() {
+ progressFlag.set(true);
+ }
+
+ boolean resetProgressFlag() {
+ return progressFlag.getAndSet(false);
+ }
+
+ public void setStatus(String status) {
+ // FIXME - BADLY
+ if (true) {
+ return;
+ }
+ taskProgress.setStatus(
+ MRTask.normalizeStatus(status, this.mrTask.jobConf));
+ // indicate that progress update needs to be sent
+ setProgressFlag();
+ }
+
+ public void setProgress(float progress) {
+ // set current phase progress.
+ // This method assumes that task has phases.
+ taskProgress.phase().set(progress);
+ // indicate that progress update needs to be sent
+ setProgressFlag();
+ }
+
+ public float getProgress() {
+ return taskProgress.getProgress();
+ };
+
+ public void progress() {
+ // indicate that progress update needs to be sent
+ setProgressFlag();
+ }
+
+ public TezCounter getCounter(String group, String name) {
+ return this.mrTask.counters == null ?
+ null :
+ this.mrTask.counters.findCounter(group, name);
+ }
+
+ public TezCounter getCounter(Enum<?> name) {
+ return this.mrTask.counters == null ?
+ null :
+ this.mrTask.counters.findCounter(name);
+ }
+
+ public void incrCounter(Enum<?> key, long amount) {
+ if (this.mrTask.counters != null) {
+ this.mrTask.counters.findCounter(key).increment(amount);
+ }
+ setProgressFlag();
+ }
+
+ public void incrCounter(String group, String counter, long amount) {
+ if (this.mrTask.counters != null) {
+ this.mrTask.counters.findCounter(group, counter).increment(amount);
+ }
+ setProgressFlag();
+ }
+
+ /**
+ * The communication thread handles communication with the parent (Task Tracker).
+ * It sends progress updates if progress has been made or if the task needs to
+ * let the parent know that it's alive. It also pings the parent to see if it's alive.
+ */
+ public void run() {
+ final int MAX_RETRIES = 3;
+ int remainingRetries = MAX_RETRIES;
+ // get current flag value and reset it as well
+ boolean sendProgress = resetProgressFlag();
+ while (!this.mrTask.taskDone.get()) {
+ synchronized (lock) {
+ done = false;
+ }
+ try {
+ boolean taskFound = true; // whether TT knows about this task
+ // sleep for a bit
+ synchronized(lock) {
+ if (this.mrTask.taskDone.get()) {
+ break;
+ }
+ lock.wait(MRTask.PROGRESS_INTERVAL);
+ }
+ if (this.mrTask.taskDone.get()) {
+ break;
+ }
+
+ if (sendProgress) {
+ // we need to send progress update
+ this.mrTask.updateCounters();
+ this.mrTask.getStatus().statusUpdate(
+ taskProgress.get(),
+ taskProgress.toString(),
+ this.mrTask.counters);
+ taskFound =
+ umbilical.statusUpdate(
+ this.mrTask.getTaskAttemptId(), this.mrTask.getStatus());
+ this.mrTask.getStatus().clearStatus();
+ }
+ else {
+ // send ping
+ taskFound = umbilical.ping(this.mrTask.getTaskAttemptId());
+ }
+
+ // if Task Tracker is not aware of our task ID (probably because it died and
+ // came back up), kill ourselves
+ if (!taskFound) {
+ MRTask.LOG.warn("Parent died. Exiting " + this.mrTask.getTaskAttemptId());
+ resetDoneFlag();
+ System.exit(66);
+ }
+
+ sendProgress = resetProgressFlag();
+ remainingRetries = MAX_RETRIES;
+ }
+ catch (Throwable t) {
+ MRTask.LOG.info("Communication exception: " + StringUtils.stringifyException(t));
+ remainingRetries -=1;
+ if (remainingRetries == 0) {
+ ReflectionUtils.logThreadInfo(MRTask.LOG, "Communication exception", 0);
+ MRTask.LOG.warn("Last retry, killing " + this.mrTask.getTaskAttemptId());
+ resetDoneFlag();
+ System.exit(65);
+ }
+ }
+ }
+ //Notify that we are done with the work
+ resetDoneFlag();
+ }
+ void resetDoneFlag() {
+ synchronized (lock) {
+ done = true;
+ lock.notify();
+ }
+ }
+ public void startCommunicationThread() {
+ if (pingThread == null) {
+ pingThread = new Thread(this, "communication thread");
+ pingThread.setDaemon(true);
+ pingThread.start();
+ }
+ }
+ public void stopCommunicationThread() throws InterruptedException {
+ if (pingThread != null) {
+ // Intent of the lock is to not send an interupt in the middle of an
+ // umbilical.ping or umbilical.statusUpdate
+ synchronized(lock) {
+ //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+ lock.notify();
+ }
+
+ synchronized (lock) {
+ while (!done) {
+ lock.wait();
+ }
+ }
+ pingThread.interrupt();
+ pingThread.join();
+ }
+ }
+
+ @Override
+ public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+ int fromEventIdx, int maxEventsToFetch,
+ TezTaskAttemptID reduce) {
+ return umbilical.getDependentTasksCompletionEvents(
+ fromEventIdx, maxEventsToFetch, reduce);
+ }
+
+ @Override
+ public void reportFatalError(TezTaskAttemptID taskAttemptId,
+ Throwable throwable, String logMsg) {
+ LOG.fatal(logMsg);
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? StringUtils.stringifyException(throwable)
+ : StringUtils.stringifyException(tCause);
+ try {
+ umbilical.fatalError(mrTask.getTaskAttemptId(), cause);
+ } catch (IOException ioe) {
+ LOG.fatal("Failed to contact the tasktracker", ioe);
+ System.exit(-1);
+ }
+ }
+
+ public TezTaskUmbilicalProtocol getUmbilical() {
+ return umbilical;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ // TODO TEZAM3
+ return 1;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol,
+ clientVersion, clientMethodsHash);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.map;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class MapProcessor extends MRTask implements Processor {
+
+ private static final Log LOG = LogFactory.getLog(MapProcessor.class);
+
+ private Progress mapPhase;
+
+ @Inject
+ public MapProcessor(
+ @Assisted TezTask context
+ ) throws IOException {
+ super(context);
+ }
+
+
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ super.initialize(conf, master);
+ TaskSplitMetaInfo[] allMetaInfo = readSplits();
+ TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezTaskContext
+ .getTaskAttemptId().getTaskID().getId()];
+ splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+ thisTaskMetaInfo.getStartOffset());
+ }
+
+ @Override
+ public void process(
+ final Input in,
+ final Output out)
+ throws IOException, InterruptedException {
+ MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+ boolean useNewApi = jobConf.getUseNewMapper();
+ initTask(jobConf, getDAGID(), reporter, useNewApi);
+
+ if (in instanceof SimpleInput) {
+ ((SimpleInput)in).setTask(this);
+ }
+
+ if (out instanceof SimpleOutput) {
+ ((SimpleOutput)out).setTask(this);
+ } else if (out instanceof SortingOutput) {
+ ((SortingOutput)out).setTask(this);
+ }
+
+
+ in.initialize(jobConf, getTaskReporter());
+ out.initialize(jobConf, getTaskReporter());
+
+ // If there are no reducers then there won't be any sort. Hence the map
+ // phase will govern the entire attempt's progress.
+ if (jobConf.getNumReduceTasks() == 0) {
+ mapPhase = getProgress().addPhase("map", 1.0f);
+ } else {
+ // If there are reducers then the entire attempt's progress will be
+ // split between the map phase (67%) and the sort phase (33%).
+ mapPhase = getProgress().addPhase("map", 0.667f);
+ }
+
+ // Sanity check
+ if (!(in instanceof SimpleInput)) {
+ throw new IOException("Unknown input! - " + in.getClass());
+ }
+ SimpleInput input = (SimpleInput)in;
+
+ if (useNewApi) {
+ runNewMapper(jobConf, reporter, input, out, getTaskReporter());
+ } else {
+ runOldMapper(jobConf, reporter, input, out, getTaskReporter());
+ }
+
+ done(out.getOutputContext(), reporter);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ void runOldMapper(
+ final JobConf job,
+ final MRTaskReporter reporter,
+ final SimpleInput input,
+ final Output output,
+ final Master master
+ ) throws IOException, InterruptedException {
+
+ RecordReader in = new OldRecordReader(input);
+
+ int numReduceTasks = job.getNumReduceTasks();
+ LOG.info("numReduceTasks: " + numReduceTasks);
+
+ OutputCollector collector = new OldOutputCollector(output);
+
+ MapRunnable runner =
+ (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+
+ try {
+ runner.run(in, collector, (Reporter)reporter);
+ mapPhase.complete();
+ // start the sort phase only if there are reducers
+ if (numReduceTasks > 0) {
+ setPhase(TezTaskStatus.Phase.SORT);
+ }
+ this.statusUpdate();
+ } finally {
+ //close
+ in.close(); // close input
+ output.close();
+ }
+ }
+
+ private void runNewMapper(final JobConf job,
+ MRTaskReporter reporter,
+ final SimpleInput in,
+ Output out,
+ final Master master
+ ) throws IOException, InterruptedException {
+ // make a task context so we can get the classes
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+
+ // make a mapper
+ org.apache.hadoop.mapreduce.Mapper mapper;
+ try {
+ mapper = (org.apache.hadoop.mapreduce.Mapper)
+ ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (!(in instanceof SimpleInput)) {
+ throw new IOException("Unknown input! - " + in.getClass());
+ }
+
+ org.apache.hadoop.mapreduce.RecordReader input =
+ new NewRecordReader(in);
+
+ org.apache.hadoop.mapreduce.RecordWriter output =
+ new NewOutputCollector(out);
+
+ org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
+
+ org.apache.hadoop.mapreduce.MapContext
+ mapContext =
+ new org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl(
+ job, IDConverter.toMRTaskAttemptId(getTaskAttemptId()),
+ input, output,
+ getCommitter(),
+ reporter, split);
+
+ org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
+ new WrappedMapper().getMapContext(mapContext);
+
+ input.initialize(split, mapperContext);
+ mapper.run(mapperContext);
+ mapPhase.complete();
+ setPhase(TezTaskStatus.Phase.SORT);
+ this.statusUpdate();
+ input.close();
+ output.close(mapperContext);
+ }
+
+ private static class NewRecordReader extends
+ org.apache.hadoop.mapreduce.RecordReader {
+ private final SimpleInput in;
+
+ private NewRecordReader(SimpleInput in) {
+ this.in = in;
+ }
+
+ @Override
+ public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ in.initializeNewRecordReader(split, context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException,
+ InterruptedException {
+ return in.hasNext();
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException,
+ InterruptedException {
+ return in.getNextKey();
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException,
+ InterruptedException {
+ return in.getNextValues().iterator().next();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return in.getProgress();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+ }
+
+ private static class OldRecordReader implements RecordReader {
+ private final SimpleInput simpleInput;
+
+ private OldRecordReader(SimpleInput simpleInput) {
+ this.simpleInput = simpleInput;
+ }
+
+ @Override
+ public boolean next(Object key, Object value) throws IOException {
+ simpleInput.setKey(key);
+ simpleInput.setValue(value);
+ try {
+ return simpleInput.hasNext();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ @Override
+ public Object createKey() {
+ return simpleInput.getOldRecordReader().createKey();
+ }
+
+ @Override
+ public Object createValue() {
+ return simpleInput.getOldRecordReader().createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return simpleInput.getOldRecordReader().getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ simpleInput.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ try {
+ return simpleInput.getProgress();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ }
+
+ private static class OldOutputCollector
+ implements OutputCollector {
+ private final Output output;
+
+ OldOutputCollector(Output output) {
+ this.output = output;
+ }
+
+ public void collect(Object key, Object value) throws IOException {
+ try {
+ output.write(key, value);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupt exception", ie);
+ }
+ }
+ }
+
+ private class NewOutputCollector
+ extends org.apache.hadoop.mapreduce.RecordWriter {
+ private final Output out;
+
+ NewOutputCollector(Output out) throws IOException {
+ this.out = out;
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException, InterruptedException {
+ out.write(key, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ out.close();
+ }
+ }
+
+ @Override
+ public void localizeConfiguration(JobConf jobConf)
+ throws IOException, InterruptedException {
+ super.localizeConfiguration(jobConf);
+ jobConf.setBoolean(JobContext.TASK_ISMAP, true);
+ }
+
+ @Override
+ public TezCounter getOutputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+ }
+
+ @Override
+ public TezCounter getInputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
+
+ }
+
+ protected TaskSplitMetaInfo[] readSplits() throws IOException {
+ TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+ allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(getConf(),
+ FileSystem.getLocal(getConf()));
+ return allTaskSplitMetaInfo;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class ReduceProcessor
+extends MRTask
+implements Processor {
+
+ private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
+
+ private Progress sortPhase;
+ private Progress reducePhase;
+
+ private Counter reduceInputKeyCounter;
+ private Counter reduceInputValueCounter;
+ private int numMapTasks;
+
+ @Inject
+ public ReduceProcessor(
+ @Assisted TezTask context
+ ) {
+ super(context);
+ TezEngineTask tezEngineContext = (TezEngineTask) context;
+ Preconditions.checkNotNull(tezEngineContext.getInputSpecList(),
+ "InputSpecList should not be null");
+ Preconditions.checkArgument(
+ tezEngineContext.getInputSpecList().size() == 1,
+ "Expected exactly one input, found : "
+ + tezEngineContext.getInputSpecList().size());
+ this.numMapTasks = tezEngineContext.getInputSpecList().get(0)
+ .getNumInputs();
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ super.initialize(conf, master);
+
+ }
+
+ @Override
+ public void process(Input in, Output out)
+ throws IOException, InterruptedException {
+ MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+ boolean useNewApi = jobConf.getUseNewMapper();
+ initTask(jobConf, getDAGID(), reporter, useNewApi);
+
+ if (in instanceof SimpleInput) {
+ ((SimpleInput)in).setTask(this);
+ } else if (in instanceof ShuffledMergedInput) {
+ ((ShuffledMergedInput)in).setTask(this);
+ }
+
+ if (out instanceof SimpleOutput) {
+ ((SimpleOutput)out).setTask(this);
+ } else if (out instanceof SortingOutput) {
+ ((SortingOutput)out).setTask(this);
+ }
+
+ in.initialize(jobConf, getTaskReporter());
+ out.initialize(jobConf, getTaskReporter());
+
+ sortPhase = getProgress().addPhase("sort");
+ reducePhase = getProgress().addPhase("reduce");
+ sortPhase.complete(); // sort is complete
+ setPhase(TezTaskStatus.Phase.REDUCE);
+
+ this.statusUpdate();
+
+ Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
+ Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+ RawComparator comparator =
+ ConfigUtils.getOutputValueGroupingComparator(jobConf);
+
+ reduceInputKeyCounter =
+ reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ reduceInputValueCounter =
+ reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+
+ // Sanity check
+ if (!(in instanceof ShuffledMergedInput)) {
+ throw new IOException("Illegal input to reduce: " + in.getClass());
+ }
+ ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+
+ if (useNewApi) {
+ try {
+ runNewReducer(
+ jobConf,
+ (TezTaskUmbilicalProtocol)getUmbilical(), reporter,
+ shuffleInput, comparator, keyClass, valueClass,
+ out);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ } else {
+ runOldReducer(
+ jobConf, (TezTaskUmbilicalProtocol)getUmbilical(), reporter,
+ shuffleInput, comparator, keyClass, valueClass, out);
+ }
+
+ done(out.getOutputContext(), reporter);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ void runOldReducer(JobConf job,
+ TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ ShuffledMergedInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final Output output) throws IOException, InterruptedException {
+
+ Reducer reducer =
+ ReflectionUtils.newInstance(job.getReducerClass(), job);
+
+ // make output collector
+
+ OutputCollector collector =
+ new OutputCollector() {
+ public void collect(Object key, Object value)
+ throws IOException {
+ try {
+ output.write(key, value);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ };
+
+ // apply reduce function
+ try {
+ ReduceValuesIterator values =
+ new ReduceValuesIterator(
+ input,
+ job.getOutputValueGroupingComparator(), keyClass, valueClass,
+ job, reporter, reduceInputValueCounter, reducePhase);
+
+ values.informReduceProgress();
+ while (values.more()) {
+ reduceInputKeyCounter.increment(1);
+ reducer.reduce(values.getKey(), values, collector, reporter);
+ values.nextKey();
+ values.informReduceProgress();
+ }
+
+ //Clean up: repeated in catch block below
+ reducer.close();
+ output.close();
+ //End of clean up.
+ } catch (IOException ioe) {
+ try {
+ reducer.close();
+ } catch (IOException ignored) {}
+
+ try {
+ output.close();
+ } catch (IOException ignored) {}
+
+ throw ioe;
+ }
+ }
+
+ private static class ReduceValuesIterator<KEY,VALUE>
+ extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+ private Counter reduceInputValueCounter;
+ private Progress reducePhase;
+
+ public ReduceValuesIterator (ShuffledMergedInput in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass,
+ Configuration conf, Progressable reporter,
+ Counter reduceInputValueCounter,
+ Progress reducePhase)
+ throws IOException {
+ super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+ this.reduceInputValueCounter = reduceInputValueCounter;
+ this.reducePhase = reducePhase;
+ }
+
+ @Override
+ public VALUE next() {
+ reduceInputValueCounter.increment(1);
+ return moveToNext();
+ }
+
+ protected VALUE moveToNext() {
+ return super.next();
+ }
+
+ public void informReduceProgress() {
+ reducePhase.set(super.in.getProgress().getProgress()); // update progress
+ reporter.progress();
+ }
+ }
+
+ void runNewReducer(JobConf job,
+ final TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ ShuffledMergedInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final Output out
+ ) throws IOException,InterruptedException,
+ ClassNotFoundException {
+ // wrap value iterator to report progress.
+ final TezRawKeyValueIterator rawIter = input.getIterator();
+ TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ };
+
+ // make a task context so we can get the classes
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+
+ // make a reducer
+ org.apache.hadoop.mapreduce.Reducer reducer =
+ (org.apache.hadoop.mapreduce.Reducer)
+ ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+
+ org.apache.hadoop.mapreduce.RecordWriter trackedRW =
+ new org.apache.hadoop.mapreduce.RecordWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ out.write(key, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ out.close();
+ }
+ };
+
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(
+ reducer, job, getTaskAttemptId(),
+ rIter, reduceInputKeyCounter,
+ reduceInputValueCounter,
+ trackedRW,
+ committer,
+ reporter, comparator, keyClass,
+ valueClass);
+ reducer.run(reducerContext);
+ trackedRW.close(reducerContext);
+ }
+
+ @Override
+ public void localizeConfiguration(JobConf jobConf)
+ throws IOException, InterruptedException {
+ super.localizeConfiguration(jobConf);
+ jobConf.setBoolean(JobContext.TASK_ISMAP, false);
+ jobConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, numMapTasks);
+ }
+
+ @Override
+ public TezCounter getOutputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+ }
+
+ @Override
+ public TezCounter getInputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class FinalTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithInMemSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, InMemorySortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithLocalSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, LocalOnFileSorterOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+public class IntermediateTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class LocalFinalTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, LocalMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class MapOnlyTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.tez.common.Constants;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+ private JobConf conf;
+
+ private static final String JOB_OUTPUT_DIR = "output";
+ private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+ private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+ + ".index";
+
+ public YarnOutputFiles() {
+ }
+
+ // assume configured to $localdir/usercache/$user/appcache/$appId
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+ private Path getAttemptOutputDir() {
+ return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite(long size) throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile() throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFileForWrite(long size) throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(String.format(SPILL_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(int mapId) throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ Constants.REDUCE_INPUT_FILE_FORMAT_STRING,
+ getAttemptOutputDir().toString(), mapId.getId()),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ public void removeAll() throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.task.FinalTask;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.IntermediateTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class TestTaskModules {
+
+ private static final Log LOG = LogFactory.getLog(TestTaskModules.class);
+
+ TezEngineTask taskContext;
+ JobConf job;
+
+ @Before
+ public void setUp() {
+ taskContext = new TezEngineTask(TezTestUtils.getMockTaskAttemptId(0, 0, 0,
+ MRTaskType.REDUCE), "tez", "tez", "TODO_vertexName",
+ TestInitialModule.class.getName(), null, null);
+ job = new JobConf();
+ }
+
+ @Test
+ public void testInitialTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestInitialModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, null);
+ }
+
+ @Test
+ public void testIntermediateTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestIntermediateModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, null);
+ }
+
+ @Test
+ public void testFinalTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestFinalModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task task = factory.createTask(taskContext);
+ LOG.info("task = " + task.getClass());
+ task.initialize(job, null);
+ }
+
+ static class TestTask implements Task {
+
+ private final Input in;
+ private final Output out;
+ private final Processor processor;
+
+ @Inject
+ public TestTask(
+ @Assisted Processor processor,
+ @Assisted Input in,
+ @Assisted Output out) {
+ this.in = in;
+ this.processor = processor;
+ this.out = out;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ LOG.info("in = " + in.getClass());
+ LOG.info("processor = " + processor.getClass());
+ LOG.info("out = " + out.getClass());
+ }
+
+ @Override
+ public Input getInput() {
+ return in;
+ }
+
+ @Override
+ public Output getOutput() {
+ return out;
+ }
+
+ @Override
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public void run() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ static class TestInitialModule extends InitialTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+
+ static class TestIntermediateModule extends IntermediateTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+
+ static class TestFinalModule extends FinalTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
+
+ private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
+ private ProceedToCompletionResponse proceedToCompletionResponse;
+
+
+ public TestUmbilicalProtocol() {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+ }
+
+ public TestUmbilicalProtocol(boolean shouldLinger) {
+ if (shouldLinger) {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
+ } else {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+ }
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+ int fromEventIdx, int maxEventsToFetch,
+ TezTaskAttemptID reduce) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus);
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace)
+ throws IOException {
+ LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace);
+ }
+
+ @Override
+ public boolean ping(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'ping' from " + taskid);
+ return true;
+ }
+
+ @Override
+ public void done(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'done' from " + taskid);
+ }
+
+ @Override
+ public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Got 'commit-pending' from " + taskId + ": status=" + taskStatus);
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'can-commit' from " + taskid);
+ return false;
+ }
+
+ @Override
+ public void shuffleError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void fsError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'fs-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void fatalError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void outputReady(TezTaskAttemptID taskAttemptId,
+ OutputContext outputContext) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ProceedToCompletionResponse proceedToCompletion(
+ TezTaskAttemptID taskAttemptId) throws IOException {
+ return proceedToCompletionResponse;
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
------------------------------------------------------------------------------
svn:eol-style = native