You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/11 06:49:46 UTC
[1/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs
(part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 e5919fa75 -> 1cf7f197d
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 79787b7..29a4b02 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -18,52 +18,103 @@
package org.apache.tez.engine.lib.output;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.engine.common.sort.impl.ExternalSorter;
import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+
+import com.google.common.collect.Lists;
/**
- * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs
* written to it and persists it to a file.
*/
-public class OnFileSortedOutput implements SortingOutput {
+public class OnFileSortedOutput implements LogicalOutput {
protected ExternalSorter sorter;
+ protected Configuration conf;
+ protected int numOutputs;
+ protected TezOutputContext outputContext;
+ private long startTime;
+ private long endTime;
- public OnFileSortedOutput(TezEngineTaskContext task) throws IOException {
- sorter = new DefaultSorter(task);
- }
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- sorter.initialize(conf, master);
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException {
+ this.startTime = System.nanoTime();
+ this.outputContext = outputContext;
+ sorter = new DefaultSorter();
+ this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+ // Initializing this parametr in this conf since it is used in multiple
+ // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
+ // TezMerger, etc.
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+ sorter.initialize(outputContext, conf, numOutputs);
+ return Collections.emptyList();
}
@Override
- public void setTask(RunningTaskContext task) {
- sorter.setTask(task);
- }
-
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- sorter.write(key, value);
+ public Writer getWriter() throws IOException {
+ return new KVWriter() {
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ sorter.write(key, value);
+ }
+ };
}
- public void close() throws IOException, InterruptedException {
- sorter.flush();
- sorter.close();
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // Not expecting any events.
}
@Override
- public OutputContext getOutputContext() {
- return null;
+ public void setNumPhysicalOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
}
+ @Override
+ public List<Event> close() throws IOException {
+ sorter.flush();
+ sorter.close();
+ this.endTime = System.nanoTime();
+
+ String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+ .toString());
+ ByteBuffer shuffleMetadata = outputContext
+ .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata);
+
+ DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+ .newBuilder();
+ payloadBuilder.setHost(host);
+ payloadBuilder.setPort(shufflePort);
+ payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+ payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
+ DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+
+ List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+
+ for (int i = 0; i < numOutputs; i++) {
+ DataMovementEvent event = new DataMovementEvent(i,
+ payloadProto.toByteArray());
+ events.add(event);
+ }
+ return events;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
index 68e0f47..bd0e933 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
@@ -22,33 +22,57 @@ import java.io.IOException;
/**
* A key/value(s) pair based {@link Reader}.
+ *
+ * Example usage
+ * <code>
+ * while (kvReader.moveToNext()) {
+ * KVRecord kvRecord = getCurrentKV();
+ * Object key = kvRecord.getKey();
+ * Iterable values = kvRecord.getValues();
+ * </code>
+ *
*/
public interface KVReader extends Reader {
/**
- * Check if there is another key/value(s) pair
+ * Moves to the next key/values(s) pair
*
- * @return true if another key/value(s) pair exists
+ * @return true if another key/value(s) pair exists, false if there are no more.
* @throws IOException
* if an error occurs
*/
- public boolean hasNext() throws IOException;
+ public boolean moveToNext() throws IOException;
/**
- * Gets the next key.
- *
- * @return the next key, or null if none exists
+ * Return the current key/value(s) pair. Use moveToNext() to advance.
+ * @return
* @throws IOException
- * if an error occurs
*/
- public Object getNextKey() throws IOException;
+ public KVRecord getCurrentKV() throws IOException;
+
+
+
/**
- * Get the next values.
- *
- * @return an <code>Iterable</code> view of the values for the current key
- * @throws IOException
- * if an error occurs
+ * Represents a key and an associated set of values
+ *
*/
- public Iterable<Object> getNextValues() throws IOException;
+ public static class KVRecord {
+
+ private Object key;
+ private Iterable<Object> values;
+
+ public KVRecord(Object key, Iterable<Object> values) {
+ this.key = key;
+ this.values = values;
+ }
+
+ public Object getKey() {
+ return this.key;
+ }
+
+ public Iterable<Object> getValues() {
+ return this.values;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
index f945b63..ad48912 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
@@ -23,7 +23,7 @@ import java.io.IOException;
/**
* A key/value(s) pair based {@link Writer}
*/
-public interface KVWriter {
+public interface KVWriter extends Writer {
/**
* Writes a key/value pair.
*
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index b4558d0..1d76d86 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -49,6 +49,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
this.sourceInfo = new EventMetaData(
EventGenerator.INPUT, taskVertexName, sourceVertexName,
taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getAttemptNumber(), sourceVertexName);
}
@Override
@@ -70,5 +73,4 @@ public class TezInputContextImpl extends TezTaskContextImpl
public String getSourceVertexName() {
return sourceVertexName;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index ba632db..e5b81d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -49,6 +49,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
this.tezUmbilical = tezUmbilical;
this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
destinationVertexName, taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getAttemptNumber(), destinationVertexName);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 4e0f061..73c4a54 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -44,6 +44,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
this.tezUmbilical = tezUmbilical;
this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
taskVertexName, "", taskAttemptID);
+ this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
+ .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+ getTaskIndex(), getAttemptNumber());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 712eec3..b77bcdd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -18,18 +18,27 @@
package org.apache.tez.engine.newapi.impl;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
public abstract class TezTaskContextImpl implements TezTaskContext {
- protected final Configuration conf;
+ private final Configuration conf;
protected final String taskVertexName;
- protected final TezTaskAttemptID taskAttemptID;
- protected final TezCounters counters;
+ private final TezTaskAttemptID taskAttemptID;
+ private final TezCounters counters;
+ private String[] workDirs;
+ protected String uniqueIdentifier;
@Private
public TezTaskContextImpl(Configuration conf,
@@ -39,9 +48,18 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
this.counters = counters;
+ // TODO Maybe change this to be task id specific at some point. For now
+ // Shuffle code relies on this being a path specified by YARN
+ this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
}
@Override
+ public ApplicationId getApplicationId() {
+ return taskAttemptID.getTaskID().getVertexID().getDAGId()
+ .getApplicationId();
+ }
+
+ @Override
public int getTaskIndex() {
return taskAttemptID.getTaskID().getId();
}
@@ -52,8 +70,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
}
@Override
+ public String getDAGName() {
+ // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+ // the unique identifier.
+ return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+ }
+
+ @Override
public String getTaskVertexName() {
- // TODO Auto-generated method stub
return taskVertexName;
}
@@ -63,5 +87,30 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
return counters;
}
- // TODO Add a method to get working dir
+ @Override
+ public String[] getWorkDirs() {
+ return Arrays.copyOf(workDirs, workDirs.length);
+ }
+
+ @Override
+ public String getUniqueIdentifier() {
+ return uniqueIdentifier;
+ }
+
+ @Override
+ public void fatalError(Throwable exception, String message) {
+ // TODO NEWTEZ Implement once the TezContext communication is setup.
+ }
+
+ @Override
+ public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+ // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getServiceProviderMetaData(String serviceName) {
+ return AuxiliaryServiceHelper.getServiceDataFromEnv(
+ ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
new file mode 100644
index 0000000..3a6b2e4
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.shuffle.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+
+public class ShuffleUtils {
+
+ public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle";
+
+ public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
+ throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+ jt.readFields(in);
+ SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+ return sk;
+ }
+
+ public static int deserializeShuffleMetaData(ByteBuffer meta)
+ throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ try {
+ in.reset(meta);
+ int port = in.readInt();
+ return port;
+ } finally {
+ in.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b2f1318..9bc430b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -80,8 +80,8 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
@@ -611,9 +611,9 @@ public class MRRSleepJob extends Configured implements Tool {
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(
- OnFileSortedOutput.class.getName()),
+ OldOnFileSortedOutput.class.getName()),
new InputDescriptor(
- ShuffledMergedInput.class.getName()))));
+ OldShuffledMergedInput.class.getName()))));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 7fae4a3..016fbda 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,8 +70,8 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -312,9 +312,9 @@ public class OrderedWordCount {
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(
- OnFileSortedOutput.class.getName()),
+ OldOnFileSortedOutput.class.getName()),
new InputDescriptor(
- ShuffledMergedInput.class.getName()))));
+ OldShuffledMergedInput.class.getName()))));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 701ca87..12953e4 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -68,8 +68,8 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -396,23 +396,23 @@ public class TestMRRJobsDAGApi {
Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInput.class.getName())));
+ OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+ OldShuffledMergedInput.class.getName())));
Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInput.class.getName())));
+ OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+ OldShuffledMergedInput.class.getName())));
Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInput.class.getName())));
+ OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+ OldShuffledMergedInput.class.getName())));
Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
- OnFileSortedOutput.class.getName()), new InputDescriptor(
- ShuffledMergedInput.class.getName())));
+ OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+ OldShuffledMergedInput.class.getName())));
dag.addEdge(edge1);
dag.addEdge(edge11);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 204f517..7df783b 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -76,8 +76,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -257,7 +257,7 @@ public class LocalJobRunnerTez implements ClientProtocol {
Collections.singletonList(new InputSpec("srcVertex", 0,
SimpleInput.class.getName())),
Collections.singletonList(new OutputSpec("tgtVertex", 0,
- LocalOnFileSorterOutput.class.getName())));
+ OldLocalOnFileSorterOutput.class.getName())));
TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
mapOutput.setConf(localConf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 5793104..4fb1876 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.SortingOutput;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -108,12 +108,12 @@ implements Processor {
if (in instanceof SimpleInput) {
((SimpleInput)in).setTask(this);
- } else if (in instanceof ShuffledMergedInput) {
- ((ShuffledMergedInput)in).setTask(this);
+ } else if (in instanceof OldShuffledMergedInput) {
+ ((OldShuffledMergedInput)in).setTask(this);
}
if(ins.length > 1) {
- if (!(in instanceof ShuffledMergedInput)) {
+ if (!(in instanceof OldShuffledMergedInput)) {
throw new IOException(
"Only ShuffledMergedInput can support multiple inputs"
+ ". inputCount=" + ins.length);
@@ -124,15 +124,15 @@ implements Processor {
+ ins.length + " From contex:" + inputs.size());
}
// initialize and merge the remaining
- ShuffledMergedInput s0 = ((ShuffledMergedInput)in);
+ OldShuffledMergedInput s0 = ((OldShuffledMergedInput)in);
for(int i=1; i<ins.length; ++i) {
Input inputi = ins[i];
- if (!(inputi instanceof ShuffledMergedInput)) {
+ if (!(inputi instanceof OldShuffledMergedInput)) {
throw new IOException(
"Only ShuffledMergedInput can support multiple inputs"
+ ". inputCount=" + ins.length);
}
- ShuffledMergedInput si = ((ShuffledMergedInput)inputi);
+ OldShuffledMergedInput si = ((OldShuffledMergedInput)inputi);
s0.mergeWith(si);
}
}
@@ -162,10 +162,10 @@ implements Processor {
reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
// Sanity check
- if (!(in instanceof ShuffledMergedInput)) {
+ if (!(in instanceof OldShuffledMergedInput)) {
throw new IOException("Illegal input to reduce: " + in.getClass());
}
- ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+ OldShuffledMergedInput shuffleInput = (OldShuffledMergedInput)in;
if (useNewApi) {
try {
@@ -194,7 +194,7 @@ implements Processor {
void runOldReducer(JobConf job,
TezTaskUmbilicalProtocol umbilical,
final MRTaskReporter reporter,
- ShuffledMergedInput input,
+ OldShuffledMergedInput input,
RawComparator comparator,
Class keyClass,
Class valueClass,
@@ -265,7 +265,7 @@ implements Processor {
private Counter reduceInputValueCounter;
private Progress reducePhase;
- public ReduceValuesIterator (ShuffledMergedInput in,
+ public ReduceValuesIterator (OldShuffledMergedInput in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
@@ -297,7 +297,7 @@ implements Processor {
void runNewReducer(JobConf job,
final TezTaskUmbilicalProtocol umbilical,
final MRTaskReporter reporter,
- ShuffledMergedInput input,
+ OldShuffledMergedInput input,
RawComparator comparator,
Class keyClass,
Class valueClass,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index ae62251..3610f9f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -38,8 +38,7 @@ import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.output.InMemorySortedOutput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -47,14 +46,9 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MapUtils;
-import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.TruncatedChannelBuffer;
-import org.jboss.netty.handler.stream.ChunkedStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
@SuppressWarnings("deprecation")
@@ -122,7 +116,7 @@ public class TestMapProcessor {
Collections.singletonList(new InputSpec("NullVertex", 0,
SimpleInput.class.getName())),
Collections.singletonList(new OutputSpec("FakeVertex", 1,
- LocalOnFileSorterOutput.class.getName())));
+ OldLocalOnFileSorterOutput.class.getName())));
MRTask mrTask = (MRTask)t.getProcessor();
Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
@@ -151,76 +145,76 @@ public class TestMapProcessor {
reader.close();
}
- @Test
- @Ignore
- public void testMapProcessorWithInMemSort() throws Exception {
-
- String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-
- final int partitions = 2;
- JobConf jobConf = new JobConf(defaultConf);
- jobConf.setNumReduceTasks(partitions);
- setUpJobConf(jobConf);
- TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
- mapOutputs.setConf(jobConf);
-
- Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
- Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
- vertexName);
-
- JobConf job = new JobConf(stageConf);
-
- job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
- "localized-resources").toUri().toString());
- localFs.delete(workDir, true);
- Task t =
- MapUtils.runMapProcessor(
- localFs, workDir, job, 0, new Path(workDir, "map0"),
- new TestUmbilicalProtocol(true), vertexName,
- Collections.singletonList(new InputSpec("NullVertex", 0,
- SimpleInput.class.getName())),
- Collections.singletonList(new OutputSpec("FakeVertex", 1,
- InMemorySortedOutput.class.getName()))
- );
- InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
-
- verifyInMemSortedStream(outputs[0], 0, 4096);
- int i = 0;
- for (i = 2; i < 256; i <<= 1) {
- verifyInMemSortedStream(outputs[0], 0, i);
- }
- verifyInMemSortedStream(outputs[0], 1, 4096);
- for (i = 2; i < 256; i <<= 1) {
- verifyInMemSortedStream(outputs[0], 1, i);
- }
-
- t.close();
- }
-
- private void verifyInMemSortedStream(
- InMemorySortedOutput output, int partition, int chunkSize)
- throws Exception {
- ChunkedStream cs =
- new ChunkedStream(
- output.getSorter().getSortedStream(partition), chunkSize);
- int actualBytes = 0;
- ChannelBuffer b = null;
- while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
- LOG.info("b = " + b);
- actualBytes +=
- (b instanceof TruncatedChannelBuffer) ?
- ((TruncatedChannelBuffer)b).capacity() :
- ((BigEndianHeapChannelBuffer)b).readableBytes();
- }
-
- LOG.info("verifyInMemSortedStream" +
- " partition=" + partition +
- " chunkSize=" + chunkSize +
- " expected=" +
- output.getSorter().getShuffleHeader(partition).getCompressedLength() +
- " actual=" + actualBytes);
- Assert.assertEquals(
- output.getSorter().getShuffleHeader(partition).getCompressedLength(),
- actualBytes);
- }
+// @Test
+// @Ignore
+// public void testMapProcessorWithInMemSort() throws Exception {
+//
+// String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+//
+// final int partitions = 2;
+// JobConf jobConf = new JobConf(defaultConf);
+// jobConf.setNumReduceTasks(partitions);
+// setUpJobConf(jobConf);
+// TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+// mapOutputs.setConf(jobConf);
+//
+// Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+// Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+// vertexName);
+//
+// JobConf job = new JobConf(stageConf);
+//
+// job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+// "localized-resources").toUri().toString());
+// localFs.delete(workDir, true);
+// Task t =
+// MapUtils.runMapProcessor(
+// localFs, workDir, job, 0, new Path(workDir, "map0"),
+// new TestUmbilicalProtocol(true), vertexName,
+// Collections.singletonList(new InputSpec("NullVertex", 0,
+// SimpleInput.class.getName())),
+// Collections.singletonList(new OutputSpec("FakeVertex", 1,
+// OldInMemorySortedOutput.class.getName()))
+// );
+// OldInMemorySortedOutput[] outputs = (OldInMemorySortedOutput[])t.getOutputs();
+//
+// verifyInMemSortedStream(outputs[0], 0, 4096);
+// int i = 0;
+// for (i = 2; i < 256; i <<= 1) {
+// verifyInMemSortedStream(outputs[0], 0, i);
+// }
+// verifyInMemSortedStream(outputs[0], 1, 4096);
+// for (i = 2; i < 256; i <<= 1) {
+// verifyInMemSortedStream(outputs[0], 1, i);
+// }
+//
+// t.close();
+// }
+//
+// private void verifyInMemSortedStream(
+// OldInMemorySortedOutput output, int partition, int chunkSize)
+// throws Exception {
+// ChunkedStream cs =
+// new ChunkedStream(
+// output.getSorter().getSortedStream(partition), chunkSize);
+// int actualBytes = 0;
+// ChannelBuffer b = null;
+// while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
+// LOG.info("b = " + b);
+// actualBytes +=
+// (b instanceof TruncatedChannelBuffer) ?
+// ((TruncatedChannelBuffer)b).capacity() :
+// ((BigEndianHeapChannelBuffer)b).readableBytes();
+// }
+//
+// LOG.info("verifyInMemSortedStream" +
+// " partition=" + partition +
+// " chunkSize=" + chunkSize +
+// " expected=" +
+// output.getSorter().getShuffleHeader(partition).getCompressedLength() +
+// " actual=" + actualBytes);
+// Assert.assertEquals(
+// output.getSorter().getShuffleHeader(partition).getCompressedLength(),
+// actualBytes);
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 2428000..2a121a6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -40,8 +40,8 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
import org.apache.tez.engine.runtime.RuntimeUtils;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
import org.apache.tez.mapreduce.TezTestUtils;
@@ -122,7 +122,7 @@ public class TestReduceProcessor {
Collections.singletonList(new InputSpec("NullVertex", 0,
SimpleInput.class.getName())),
Collections.singletonList(new OutputSpec("FakeVertex", 1,
- LocalOnFileSorterOutput.class.getName())));
+ OldLocalOnFileSorterOutput.class.getName())));
LOG.info("Starting reduce...");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 0fb823f..2d59b18 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,8 +95,8 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -463,8 +463,8 @@ public class YARNRunner implements ClientProtocol {
EdgeProperty edgeProperty = new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
- new OutputDescriptor(OnFileSortedOutput.class.getName()),
- new InputDescriptor(ShuffledMergedInput.class.getName()));
+ new OutputDescriptor(OldOnFileSortedOutput.class.getName()),
+ new InputDescriptor(OldShuffledMergedInput.class.getName()));
Edge edge = null;
edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
[3/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs
(part of TEZ-398). (sseth)
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 16ded35..a5401fa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
@@ -39,26 +41,21 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.Processor;
+import org.apache.tez.engine.newapi.TezInputContext;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -66,15 +63,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
public class MergeManager {
private static final Log LOG = LogFactory.getLog(MergeManager.class);
-
- private final TezTaskAttemptID taskAttemptId;
-
+
private final Configuration conf;
private final FileSystem localFS;
private final FileSystem rfs;
private final LocalDirAllocator localDirAllocator;
private final TezTaskOutputFiles mapOutputFile;
+ private final Progressable nullProgressable = new NullProgressable();
+ private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor
Set<MapOutput> inMemoryMergedMapOutputs =
new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
@@ -97,13 +94,15 @@ public class MergeManager {
private final int ioSortFactor;
- private final TezTaskReporter reporter;
private final ExceptionReporter exceptionReporter;
+ private final TezInputContext inputContext;
+
/**
* Combiner processor to run during in-memory merge, if defined.
*/
- private final Processor combineProcessor;
+ // TODO NEWTEZ Fix Combiner
+ //private final Processor combineProcessor;
private final TezCounter spilledRecordsCounter;
@@ -113,31 +112,28 @@ public class MergeManager {
private final CompressionCodec codec;
- private final Progress mergePhase;
+ private volatile boolean finalMergeComplete = false;
- public MergeManager(TezTaskAttemptID taskAttemptId,
- Configuration conf,
+ public MergeManager(Configuration conf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
- TezTaskReporter reporter,
+ TezInputContext inputContext,
Processor combineProcessor,
TezCounter spilledRecordsCounter,
TezCounter reduceCombineInputCounter,
TezCounter mergedMapOutputsCounter,
- ExceptionReporter exceptionReporter,
- Progress mergePhase) {
- this.taskAttemptId = taskAttemptId;
+ ExceptionReporter exceptionReporter) {
+ // TODO NEWTEZ Change to include Combiner
+ this.inputContext = inputContext;
this.conf = conf;
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
- this.reporter = reporter;
- this.combineProcessor = combineProcessor;
+ //this.combineProcessor = combineProcessor;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
- this.mapOutputFile = new TezTaskOutputFiles();
- this.mapOutputFile.setConf(conf);
+ this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
@@ -224,13 +220,6 @@ public class MergeManager {
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
-
- this.mergePhase = mergePhase;
- }
-
-
- TezTaskAttemptID getReduceId() {
- return taskAttemptId;
}
public void waitForInMemoryMerge() throws InterruptedException {
@@ -240,18 +229,18 @@ public class MergeManager {
private boolean canShuffleToMemory(long requestedSize) {
return (requestedSize < maxSingleShuffleLimit);
}
-
+
final private MapOutput stallShuffle = new MapOutput(null);
- public synchronized MapOutput reserve(TezTaskAttemptID mapId,
+ public synchronized MapOutput reserve(TaskAttemptIdentifier srcAttemptIdentifier,
long requestedSize,
int fetcher
) throws IOException {
if (!canShuffleToMemory(requestedSize)) {
- LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
+ LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
- return new MapOutput(mapId, this, requestedSize, conf,
+ return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf,
localDirAllocator, fetcher, true,
mapOutputFile);
}
@@ -272,17 +261,17 @@ public class MergeManager {
// all the stalled threads
if (usedMemory > memoryLimit) {
- LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return stallShuffle;
}
// Allow the in-memory shuffle to progress
- LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ "CommitMemory is (" + commitMemory + ")");
- return unconditionalReserve(mapId, requestedSize, true);
+ return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
}
/**
@@ -290,9 +279,9 @@ public class MergeManager {
* @return
*/
private synchronized MapOutput unconditionalReserve(
- TezTaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+ TaskAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
usedMemory += requestedSize;
- return new MapOutput(mapId, this, (int)requestedSize,
+ return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize,
primaryMapOutput);
}
@@ -349,6 +338,18 @@ public class MergeManager {
}
}
}
+
+ /**
+ * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
+ * return an invalid state since a merge may not be in progress dur to
+ * inadequate inputs
+ *
+ * @return true if the merge process is complete, otherwise false
+ */
+ @Private
+ public boolean isMergeComplete() {
+ return finalMergeComplete;
+ }
public TezRawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
@@ -362,28 +363,32 @@ public class MergeManager {
new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
memory.addAll(inMemoryMapOutputs);
List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
- return finalMerge(conf, rfs, memory, disk);
+ TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+ this.finalMergeComplete = true;
+ return kvIter;
}
void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
throws IOException, InterruptedException {
- CombineInput combineIn = new CombineInput(kvIter);
- combineIn.initialize(conf, reporter);
+ // TODO NEWTEZ Fix CombineProcessor
- CombineOutput combineOut = new CombineOutput(writer);
- combineOut.initialize(conf, reporter);
-
- try {
- combineProcessor.process(new Input[] {combineIn},
- new Output[] {combineOut});
- } catch (IOException ioe) {
- try {
- combineProcessor.close();
- } catch (IOException ignoredException) {}
-
- throw ioe;
- }
+// CombineInput combineIn = new CombineInput(kvIter);
+// combineIn.initialize(conf, reporter);
+//
+// CombineOutput combineOut = new CombineOutput(writer);
+// combineOut.initialize(conf, reporter);
+//
+// try {
+// combineProcessor.process(new Input[] {combineIn},
+// new Output[] {combineOut});
+// } catch (IOException ioe) {
+// try {
+// combineProcessor.close();
+// } catch (IOException ignoredException) {}
+//
+// throw ioe;
+// }
}
@@ -404,7 +409,7 @@ public class MergeManager {
return;
}
- TezTaskAttemptID dummyMapId = inputs.get(0).getMapId();
+ TaskAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
@@ -424,13 +429,13 @@ public class MergeManager {
ConfigUtils.getIntermediateInputKeyClass(conf),
ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
- new Path(taskAttemptId.toString()),
+ new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- reporter, null, null, null);
- TezMerger.writeFile(rIter, writer, reporter, conf);
+ nullProgressable, null, null, null);
+ TezMerger.writeFile(rIter, writer, nullProgressable, conf);
writer.close();
- LOG.info(taskAttemptId +
+ LOG.info(inputContext.getUniqueIdentifier() +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
@@ -463,8 +468,7 @@ public class MergeManager {
//in the merge method)
//figure out the mapId
- TezTaskAttemptID mapId = inputs.get(0).getMapId();
- TezTaskID mapTaskId = mapId.getTaskID();
+ TaskAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
@@ -472,7 +476,7 @@ public class MergeManager {
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
- mapOutputFile.getInputFileForWrite(mapTaskId,
+ mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getTaskIndex(),
mergeOutputSize).suffix(
Constants.MERGED_OUTPUT_PREFIX);
@@ -492,19 +496,19 @@ public class MergeManager {
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
- new Path(taskAttemptId.toString()),
+ new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- reporter, spilledRecordsCounter, null, null);
+ nullProgressable, spilledRecordsCounter, null, null);
if (null == combineProcessor) {
- TezMerger.writeFile(rIter, writer, reporter, conf);
+ TezMerger.writeFile(rIter, writer, nullProgressable, conf);
} else {
runCombineProcessor(rIter, writer);
}
writer.close();
writer = null;
- LOG.info(taskAttemptId +
+ LOG.info(inputContext.getUniqueIdentifier() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
@@ -568,7 +572,7 @@ public class MergeManager {
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, null);
TezRawKeyValueIterator iter = null;
- Path tmpDir = new Path(taskAttemptId.toString());
+ Path tmpDir = new Path(inputContext.getUniqueIdentifier());
try {
iter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -576,10 +580,10 @@ public class MergeManager {
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- reporter, spilledRecordsCounter, null,
+ nullProgressable, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
- TezMerger.writeFile(iter, writer, reporter, conf);
+ TezMerger.writeFile(iter, writer, nullProgressable, conf);
writer.close();
} catch (IOException e) {
localFS.delete(outputPath, true);
@@ -588,7 +592,7 @@ public class MergeManager {
closeOnDiskFile(outputPath);
- LOG.info(taskAttemptId +
+ LOG.info(inputContext.getUniqueIdentifier() +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
@@ -615,7 +619,7 @@ public class MergeManager {
totalSize += size;
fullSize -= size;
IFile.Reader reader = new InMemoryReader(MergeManager.this,
- mo.getMapId(),
+ mo.getAttemptIdentifier(),
data, 0, (int)size);
inMemorySegments.add(new Segment(reader, true,
(mo.isPrimaryMapOutput() ?
@@ -683,7 +687,7 @@ public class MergeManager {
// merge config params
Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
- final Path tmpDir = new Path(taskAttemptId.toString());
+ final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
final RawComparator comparator =
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
@@ -692,7 +696,7 @@ public class MergeManager {
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
- TezTaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+ int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getTaskIndex();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
maxInMemReduce);
@@ -710,17 +714,16 @@ public class MergeManager {
mergePhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath =
- mapOutputFile.getInputFileForWrite(mapId,
+ mapOutputFile.getInputFileForWrite(srcTaskId,
inMemToDiskBytes).suffix(
Constants.MERGED_OUTPUT_PREFIX);
final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, reporter, spilledRecordsCounter, null,
- mergePhase);
+ tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
final Writer writer = new Writer(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
- TezMerger.writeFile(rIter, writer, reporter, job);
+ TezMerger.writeFile(rIter, writer, nullProgressable, job);
// add to list of final disk outputs.
onDiskMapOutputs.add(outputPath);
} catch (IOException e) {
@@ -784,13 +787,10 @@ public class MergeManager {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
- // Pass mergePhase only if there is a going to be intermediate
- // merges. See comment where mergePhaseFinished is being set
- Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
- reporter, false, spilledRecordsCounter, null, thisPhase);
+ nullProgressable, false, spilledRecordsCounter, null, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
@@ -800,7 +800,7 @@ public class MergeManager {
}
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
- comparator, reporter, spilledRecordsCounter, null,
+ comparator, nullProgressable, spilledRecordsCounter, null,
null);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 69dd036..9dd213e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -18,6 +18,10 @@
package org.apache.tez.engine.common.shuffle.impl;
import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -27,18 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.TezTaskStatus;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+
+import com.google.common.base.Preconditions;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -46,160 +48,176 @@ public class Shuffle implements ExceptionReporter {
private static final Log LOG = LogFactory.getLog(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
- private static final int MAX_EVENTS_TO_FETCH = 10000;
- private static final int MIN_EVENTS_TO_FETCH = 100;
- private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
-
- private final TezEngineTaskContext taskContext;
- private final RunningTaskContext runningTaskContext;
+
private final Configuration conf;
- private final TezTaskReporter reporter;
+ private final TezInputContext inputContext;
private final ShuffleClientMetrics metrics;
-
+
+ private final ShuffleInputEventHandler eventHandler;
private final ShuffleScheduler scheduler;
private final MergeManager merger;
private Throwable throwable = null;
private String throwingThreadName = null;
- private final Progress copyPhase;
- private final Progress mergePhase;
- private final int tasksInDegree;
+ private final int numInputs;
private final AtomicInteger reduceStartId;
private AtomicInteger reduceRange = new AtomicInteger(
TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
- public Shuffle(TezEngineTaskContext taskContext,
- RunningTaskContext runningTaskContext,
- Configuration conf,
- int tasksInDegree,
- TezTaskReporter reporter,
- Processor combineProcessor
- ) throws IOException {
- this.taskContext = taskContext;
- this.runningTaskContext = runningTaskContext;
+ private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
+
+ public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ // TODO NEWTEZ Handle Combiner
+ this.inputContext = inputContext;
this.conf = conf;
- this.reporter = reporter;
- this.metrics =
- new ShuffleClientMetrics(
- taskContext.getTaskAttemptId(), this.conf,
- this.taskContext.getUser(), this.taskContext.getJobName());
- this.tasksInDegree = tasksInDegree;
+ this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+ inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+ this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+
+ this.numInputs = numInputs;
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
- copyPhase = this.runningTaskContext.getProgress().addPhase("copy");
- mergePhase = this.runningTaskContext.getProgress().addPhase("merge");
// TODO TEZ Get rid of Map / Reduce references.
TezCounter shuffledMapsCounter =
- reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
+ inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
TezCounter reduceShuffleBytes =
- reporter.getCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+ inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
TezCounter failedShuffleCounter =
- reporter.getCounter(TaskCounter.FAILED_SHUFFLE);
+ inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
TezCounter spilledRecordsCounter =
- reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+ inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter reduceCombineInputCounter =
- reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
TezCounter mergedMapOutputsCounter =
- reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+ inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
- reduceStartId = new AtomicInteger(
- taskContext.getTaskAttemptId().getTaskID().getId());
+ reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
+ " with default reduce range: " + reduceRange.get());
- scheduler =
- new ShuffleScheduler(this.conf, tasksInDegree,
- runningTaskContext.getStatus(),
- this, copyPhase,
- shuffledMapsCounter,
- reduceShuffleBytes,
- failedShuffleCounter);
- merger = new MergeManager(this.taskContext.getTaskAttemptId(),
- this.conf, localFS,
- localDirAllocator, reporter,
- combineProcessor,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this, mergePhase);
+ scheduler = new ShuffleScheduler(
+ this.inputContext,
+ this.conf,
+ this.numInputs,
+ this,
+ shuffledMapsCounter,
+ reduceShuffleBytes,
+ failedShuffleCounter);
+ eventHandler= new ShuffleInputEventHandler(
+ inputContext,
+ this,
+ scheduler);
+ merger = new MergeManager(
+ this.conf,
+ localFS,
+ localDirAllocator,
+ inputContext,
+ null, // TODO NEWTEZ Fix Combiner
+ spilledRecordsCounter,
+ reduceCombineInputCounter,
+ mergedMapOutputsCounter,
+ this);
}
- public TezRawKeyValueIterator run() throws IOException, InterruptedException {
- // Scale the maximum events we fetch per RPC call to mitigate OOM issues
- // on the ApplicationMaster when a thundering herd of reducers fetch events
- // TODO: This should not be necessary after HADOOP-8942
- int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
- MAX_RPC_OUTSTANDING_EVENTS / tasksInDegree);
- int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+ public void handleEvents(List<Event> events) {
+ eventHandler.handleEvents(events);
+ }
+
+ /**
+ * Indicates whether the Shuffle and Merge processing is complete.
+ * @return false if not complete, true if complete or if an error occurred.
+ */
+ public boolean isInputReady() {
+ if (runShuffleFuture == null) {
+ return false;
+ }
+ return runShuffleFuture.isDone();
+ //return scheduler.isDone() && merger.isMergeComplete();
+ }
- // Start the map-completion events fetcher thread
- final EventFetcher eventFetcher =
- new EventFetcher(taskContext.getTaskAttemptId(), reporter, scheduler, this,
- maxEventsToFetch);
- eventFetcher.start();
-
- // Start the map-output fetcher threads
- final int numFetchers =
- conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
- Fetcher[] fetchers = new Fetcher[numFetchers];
- for (int i=0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, scheduler,
- merger, reporter, metrics, this,
- runningTaskContext.getJobTokenSecret());
- fetchers[i].start();
+ /**
+ * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
+ * @return an iterator over the fetched input.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
+ Preconditions.checkState(runShuffleFuture != null,
+ "waitForInput can only be called after run");
+ TezRawKeyValueIterator kvIter;
+ try {
+ kvIter = runShuffleFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
+ } else {
+ throw new TezUncheckedException(
+ "Unexpected exception type while running Shuffle and Merge", cause);
+ }
}
-
- // Wait for shuffle to complete successfully
- while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
- reporter.progress();
+ return kvIter;
+ }
+
+ public void run() {
+ RunShuffleCallable runShuffle = new RunShuffleCallable();
+ runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
+ new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
+ }
+
+ private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
+ @Override
+ public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+ final int numFetchers =
+ conf.getInt(
+ TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
+ TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+ Fetcher[] fetchers = new Fetcher[numFetchers];
+ for (int i = 0; i < numFetchers; ++i) {
+ fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, inputContext);
+ fetchers[i].start();
+ }
- synchronized (this) {
- if (throwable != null) {
- throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
+ while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+ synchronized (this) {
+ if (throwable != null) {
+ throw new ShuffleError("error in shuffle in " + throwingThreadName,
+ throwable);
+ }
}
}
- }
-
- // Stop the event-fetcher thread
- eventFetcher.shutDown();
-
- // Stop the map-output fetcher threads
- for (Fetcher fetcher : fetchers) {
- fetcher.shutDown();
- }
- fetchers = null;
-
- // stop the scheduler
- scheduler.close();
+
+ // Stop the map-output fetcher threads
+ for (Fetcher fetcher : fetchers) {
+ fetcher.shutDown();
+ }
+ fetchers = null;
+
+ // stop the scheduler
+ scheduler.close();
- copyPhase.complete(); // copy is already complete
- runningTaskContext.getStatus().setPhase(TezTaskStatus.Phase.SORT);
-
- runningTaskContext.statusUpdate();
-
- // Finish the on-going merges...
- TezRawKeyValueIterator kvIter = null;
- try {
- kvIter = merger.close();
- } catch (Throwable e) {
- throw new ShuffleError("Error while doing final merge " , e);
- }
- // Sanity check
- synchronized (this) {
- if (throwable != null) {
- throw new ShuffleError("error in shuffle in " + throwingThreadName,
- throwable);
+ // Finish the on-going merges...
+ TezRawKeyValueIterator kvIter = null;
+ try {
+ kvIter = merger.close();
+ } catch (Throwable e) {
+ throw new ShuffleError("Error while doing final merge " , e);
+ }
+
+ // Sanity check
+ synchronized (Shuffle.this) {
+ if (throwable != null) {
+ throw new ShuffleError("error in shuffle in " + throwingThreadName,
+ throwable);
+ }
}
+ return kvIter;
}
-
- return kvIter;
}
public int getReduceStartId() {
@@ -229,19 +247,8 @@ public class Shuffle implements ExceptionReporter {
super(msg, t);
}
}
-
- public void updateUserPayload(byte[] userPayload) throws IOException {
- if(userPayload == null) {
- return;
- }
- Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
- int reduceRange = conf.getInt(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE,
- TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
- setReduceRange(reduceRange);
- }
-
- private void setReduceRange(int range) {
+
+ public void setPartitionRange(int range) {
if (range == reduceRange.get()) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
index 34b26c4..850dbeb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.common.TezEngineUtils;
class ShuffleClientMetrics implements Updater {
@@ -35,10 +35,10 @@ class ShuffleClientMetrics implements Updater {
private int numThreadsBusy = 0;
private final int numCopiers;
- ShuffleClientMetrics(TezTaskAttemptID reduceId, Configuration jobConf,
- String user, String jobName) {
+ ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf,
+ String user) {
this.numCopiers =
- jobConf.getInt(
+ conf.getInt(
TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES,
TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
@@ -46,12 +46,10 @@ class ShuffleClientMetrics implements Updater {
this.shuffleMetrics =
MetricsUtil.createRecord(metricsContext, "shuffleInput");
this.shuffleMetrics.setTag("user", user);
- this.shuffleMetrics.setTag("jobName", jobName);
- this.shuffleMetrics.setTag("jobId",
- reduceId.getTaskID().getVertexID().getDAGId().toString());
- this.shuffleMetrics.setTag("taskId", reduceId.toString());
+ this.shuffleMetrics.setTag("dagName", dagName);
+ this.shuffleMetrics.setTag("taskId", TezEngineUtils.getTaskIdentifier(vertexName, taskIndex));
this.shuffleMetrics.setTag("sessionId",
- jobConf.get(
+ conf.get(
TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID,
TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
metricsContext.registerUpdater(this);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
new file mode 100644
index 0000000..012103f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ShuffleInputEventHandler {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+
+ private final ShuffleScheduler scheduler;
+ private final TezInputContext inputContext;
+ private final Shuffle shuffle;
+
+ private int maxMapRuntime = 0;
+ private boolean shuffleRangeSet = false;
+
+ public ShuffleInputEventHandler(TezInputContext inputContext,
+ Shuffle shuffle, ShuffleScheduler scheduler) {
+ this.inputContext = inputContext;
+ this.shuffle = shuffle;
+ this.scheduler = scheduler;
+ }
+
+ public void handleEvents(List<Event> events) {
+ for (Event event : events) {
+ handleEvent(event);
+ }
+ }
+
+
+ private void handleEvent(Event event) {
+ if (event instanceof InputInformationEvent) {
+ processInputInformationEvent((InputInformationEvent) event);
+ }
+ else if (event instanceof DataMovementEvent) {
+ processDataMovementEvent((DataMovementEvent) event);
+ } else if (event instanceof InputFailedEvent) {
+ processTaskFailedEvent((InputFailedEvent) event);
+ }
+ }
+
+ private void processInputInformationEvent(InputInformationEvent iiEvent) {
+ InputInformationEventPayloadProto inputInfoPayload;
+ try {
+ inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
+ }
+ int partitionRange = inputInfoPayload.getPartitionRange();
+ shuffle.setPartitionRange(partitionRange);
+ this.shuffleRangeSet = true;
+ }
+
+ private void processDataMovementEvent(DataMovementEvent dmEvent) {
+ Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
+ DataMovementEventPayloadProto shufflePayload;
+ try {
+ shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+ }
+ int partitionId = dmEvent.getSourceIndex();
+ URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+
+ TaskAttemptIdentifier srcAttemptIdentifier = new TaskAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+ scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
+
+ // TODO NEWTEZ See if this duration hack can be removed.
+ int duration = shufflePayload.getRunDuration();
+ if (duration > maxMapRuntime) {
+ maxMapRuntime = duration;
+ scheduler.informMaxMapRunTime(maxMapRuntime);
+ }
+ }
+
+ private void processTaskFailedEvent(InputFailedEvent ifEvent) {
+ TaskAttemptIdentifier taIdentifier = new TaskAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+ scheduler.obsoleteMapOutput(taIdentifier);
+ LOG.info("Obsoleting output of src-task: " + taIdentifier);
+ }
+
+ // TODO NEWTEZ Handle encrypted shuffle
+ private URI getBaseURI(String host, int port, int partitionId) {
+ StringBuilder sb = new StringBuilder("http://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(String.valueOf(port));
+ sb.append("/");
+
+ sb.append("mapOutput?job=");
+ // Required to use the existing ShuffleHandler
+ sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
+
+ sb.append("&reduce=");
+ sb.append(partitionId);
+ sb.append("&map=");
+ URI u = URI.create(sb.toString());
+ return u;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
index 6bd18ef..964533d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@@ -36,12 +38,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+
+import com.google.common.collect.Lists;
class ShuffleScheduler {
static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
@@ -55,24 +59,26 @@ class ShuffleScheduler {
private static final long INITIAL_PENALTY = 10000;
private static final float PENALTY_GROWTH_RATE = 1.3f;
- private final Map<TezTaskID, MutableInt> finishedMaps;
- private final int tasksInDegree;
+ // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
+ private final Map<Integer, MutableInt> finishedMaps;
+ private final int numInputs;
private int remainingMaps;
- private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+ private Map<TaskAttemptIdentifier, MapHost> mapLocations = new HashMap<TaskAttemptIdentifier, MapHost>();
+ //TODO NEWTEZ Clean this and other maps at some point
+ private ConcurrentMap<String, TaskAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, TaskAttemptIdentifier>();
private Set<MapHost> pendingHosts = new HashSet<MapHost>();
- private Set<TezTaskAttemptID> obsoleteMaps = new HashSet<TezTaskAttemptID>();
+ private Set<TaskAttemptIdentifier> obsoleteMaps = new HashSet<TaskAttemptIdentifier>();
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
private final Referee referee = new Referee();
- private final Map<TezTaskAttemptID,IntWritable> failureCounts =
- new HashMap<TezTaskAttemptID,IntWritable>();
+ private final Map<TaskAttemptIdentifier, IntWritable> failureCounts =
+ new HashMap<TaskAttemptIdentifier,IntWritable>();
private final Map<String,IntWritable> hostFailures =
new HashMap<String,IntWritable>();
- private final TezTaskStatus status;
+ private final TezInputContext inputContext;
private final Shuffle shuffle;
private final int abortFailureLimit;
- private final Progress progress;
private final TezCounter shuffledMapsCounter;
private final TezCounter reduceShuffleBytes;
private final TezCounter failedShuffleCounter;
@@ -89,26 +95,25 @@ class ShuffleScheduler {
private boolean reportReadErrorImmediately = true;
- public ShuffleScheduler(Configuration conf,
+ public ShuffleScheduler(TezInputContext inputContext,
+ Configuration conf,
int tasksInDegree,
- TezTaskStatus status,
Shuffle shuffle,
- Progress progress,
TezCounter shuffledMapsCounter,
TezCounter reduceShuffleBytes,
TezCounter failedShuffleCounter) {
- this.tasksInDegree = tasksInDegree;
+ this.inputContext = inputContext;
+ this.numInputs = tasksInDegree;
abortFailureLimit = Math.max(30, tasksInDegree / 10);
remainingMaps = tasksInDegree;
- finishedMaps = new HashMap<TezTaskID, MutableInt>(remainingMaps);
+ //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
+ finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
this.shuffle = shuffle;
- this.status = status;
- this.progress = progress;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = System.currentTimeMillis();
- lastProgressTime = startTime;
+ this.lastProgressTime = startTime;
referee.start();
this.maxFailedUniqueFetches = Math.min(tasksInDegree,
this.maxFailedUniqueFetches);
@@ -122,19 +127,19 @@ class ShuffleScheduler {
TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
}
- public synchronized void copySucceeded(TezTaskAttemptID mapId,
+ public synchronized void copySucceeded(TaskAttemptIdentifier srcAttemptIdentifier,
MapHost host,
long bytes,
- long millis,
+ long milis,
MapOutput output
) throws IOException {
- failureCounts.remove(mapId);
+ String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+ failureCounts.remove(taskIdentifier);
hostFailures.remove(host.getHostName());
- TezTaskID taskId = mapId.getTaskID();
- if (!isFinishedTaskTrue(taskId)) {
+ if (!isFinishedTaskTrue(srcAttemptIdentifier.getTaskIndex())) {
output.commit();
- if(incrementTaskCopyAndCheckCompletion(taskId)) {
+ if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getTaskIndex())) {
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
@@ -142,38 +147,40 @@ class ShuffleScheduler {
}
// update the status
+ lastProgressTime = System.currentTimeMillis();
totalBytesShuffledTillNow += bytes;
- updateStatus();
+ logProgress();
reduceShuffleBytes.increment(bytes);
- lastProgressTime = System.currentTimeMillis();
- LOG.debug("map " + mapId + " done " + status.getStateString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("src task: "
+ + TezEngineUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(), srcAttemptIdentifier.getTaskIndex(),
+ srcAttemptIdentifier.getAttemptNumber()) + " done");
+ }
}
}
-
- private void updateStatus() {
+
+ private void logProgress() {
float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
- int mapsDone = tasksInDegree - remainingMaps;
+ int mapsDone = numInputs - remainingMaps;
long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
float transferRate = mbs / secsSinceStart;
- progress.set((float) mapsDone / tasksInDegree);
- String statusString = mapsDone + " / " + tasksInDegree + " copied.";
- status.setStateString(statusString);
-
- progress.setStatus("copy(" + mapsDone + " of " + tasksInDegree + " at "
+ LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
+ mbpsFormat.format(transferRate) + " MB/s)");
}
- public synchronized void copyFailed(TezTaskAttemptID mapId, MapHost host,
+ public synchronized void copyFailed(TaskAttemptIdentifier srcAttempt,
+ MapHost host,
boolean readError) {
host.penalize();
int failures = 1;
- if (failureCounts.containsKey(mapId)) {
- IntWritable x = failureCounts.get(mapId);
+ if (failureCounts.containsKey(srcAttempt)) {
+ IntWritable x = failureCounts.get(srcAttempt);
x.set(x.get() + 1);
failures = x.get();
} else {
- failureCounts.put(mapId, new IntWritable(1));
+ failureCounts.put(srcAttempt, new IntWritable(1));
}
String hostname = host.getHostName();
if (hostFailures.containsKey(hostname)) {
@@ -184,13 +191,17 @@ class ShuffleScheduler {
}
if (failures >= abortFailureLimit) {
try {
- throw new IOException(failures + " failures downloading " + mapId);
+ throw new IOException(failures
+ + " failures downloading "
+ + TezEngineUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+ srcAttempt.getAttemptNumber()));
} catch (IOException ie) {
shuffle.reportException(ie);
}
}
- checkAndInformJobTracker(failures, mapId, readError);
+ checkAndInformJobTracker(failures, srcAttempt, readError);
checkReducerHealth();
@@ -206,11 +217,23 @@ class ShuffleScheduler {
// after every read error, if 'reportReadErrorImmediately' is true or
// after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
- int failures, TezTaskAttemptID mapId, boolean readError) {
+ int failures, TaskAttemptIdentifier srcAttempt, boolean readError) {
if ((reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
- status.addFailedDependency(mapId);
+ LOG.info("Reporting fetch failure for "
+ + TezEngineUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+ srcAttempt.getAttemptNumber()) + " to jobtracker.");
+
+ List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+ failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+ + TezEngineUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+ srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt
+ .getTaskIndex(), srcAttempt.getAttemptNumber()));
+
+ inputContext.sendEvents(failedEvents);
+ //status.addFailedDependency(mapId);
}
}
@@ -220,7 +243,7 @@ class ShuffleScheduler {
final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
long totalFailures = failedShuffleCounter.getValue();
- int doneMaps = tasksInDegree - remainingMaps;
+ int doneMaps = numInputs - remainingMaps;
boolean reducerHealthy =
(((float)totalFailures / (totalFailures + doneMaps))
@@ -228,7 +251,7 @@ class ShuffleScheduler {
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
- (((float)doneMaps / tasksInDegree)
+ (((float)doneMaps / numInputs)
>= MIN_REQUIRED_PROGRESS_PERCENT);
// check if the reducer is stalled for a long time
@@ -252,7 +275,7 @@ class ShuffleScheduler {
// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
- failureCounts.size() == (tasksInDegree - doneMaps))
+ failureCounts.size() == (numInputs - doneMaps))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
LOG.fatal("Shuffle failed with too many fetch failures " +
@@ -263,28 +286,29 @@ class ShuffleScheduler {
}
- public synchronized void tipFailed(TezTaskID taskId) {
- if (!isFinishedTaskTrue(taskId)) {
- setFinishedTaskTrue(taskId);
+ public synchronized void tipFailed(int srcTaskIndex) {
+ if (!isFinishedTaskTrue(srcTaskIndex)) {
+ setFinishedTaskTrue(srcTaskIndex);
if (--remainingMaps == 0) {
notifyAll();
}
- updateStatus();
+ logProgress();
}
}
public synchronized void addKnownMapOutput(String hostName,
int partitionId,
String hostUrl,
- TezTaskAttemptID mapId) {
+ TaskAttemptIdentifier srcAttempt) {
String identifier = MapHost.createIdentifier(hostName, partitionId);
MapHost host = mapLocations.get(identifier);
if (host == null) {
host = new MapHost(partitionId, hostName, hostUrl);
assert identifier.equals(host.getIdentifier());
- mapLocations.put(identifier, host);
+ mapLocations.put(srcAttempt, host);
}
- host.addKnownMap(mapId);
+ host.addKnownMap(srcAttempt);
+ pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
// Mark the host as pending
if (host.getState() == MapHost.State.PENDING) {
@@ -293,13 +317,14 @@ class ShuffleScheduler {
}
}
- public synchronized void obsoleteMapOutput(TezTaskAttemptID mapId) {
- obsoleteMaps.add(mapId);
+ public synchronized void obsoleteMapOutput(TaskAttemptIdentifier srcAttempt) {
+ // The incoming srcAttempt does not contain a path component.
+ obsoleteMaps.add(srcAttempt);
}
- public synchronized void putBackKnownMapOutput(MapHost host,
- TezTaskAttemptID mapId) {
- host.addKnownMap(mapId);
+ public synchronized void putBackKnownMapOutput(MapHost host,
+ TaskAttemptIdentifier srcAttempt) {
+ host.addKnownMap(srcAttempt);
}
public synchronized MapHost getHost() throws InterruptedException {
@@ -324,16 +349,20 @@ class ShuffleScheduler {
return host;
}
- public synchronized List<TezTaskAttemptID> getMapsForHost(MapHost host) {
- List<TezTaskAttemptID> list = host.getAndClearKnownMaps();
- Iterator<TezTaskAttemptID> itr = list.iterator();
- List<TezTaskAttemptID> result = new ArrayList<TezTaskAttemptID>();
+ public TaskAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
+ return pathToIdentifierMap.get(pathComponent);
+ }
+
+ public synchronized List<TaskAttemptIdentifier> getMapsForHost(MapHost host) {
+ List<TaskAttemptIdentifier> list = host.getAndClearKnownMaps();
+ Iterator<TaskAttemptIdentifier> itr = list.iterator();
+ List<TaskAttemptIdentifier> result = new ArrayList<TaskAttemptIdentifier>();
int includedMaps = 0;
int totalSize = list.size();
// find the maps that we still need, up to the limit
while (itr.hasNext()) {
- TezTaskAttemptID id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+ TaskAttemptIdentifier id = itr.next();
+ if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
result.add(id);
if (++includedMaps >= MAX_MAPS_AT_ONCE) {
break;
@@ -342,8 +371,8 @@ class ShuffleScheduler {
}
// put back the maps left after the limit
while (itr.hasNext()) {
- TezTaskAttemptID id = itr.next();
- if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+ TaskAttemptIdentifier id = itr.next();
+ if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
host.addKnownMap(id);
}
}
@@ -367,8 +396,17 @@ class ShuffleScheduler {
mapLocations.clear();
obsoleteMaps.clear();
pendingHosts.clear();
+ pathToIdentifierMap.clear();
}
-
+
+ /**
+ * Utility method to check if the Shuffle data fetch is complete.
+ * @return
+ */
+ public synchronized boolean isDone() {
+ return remainingMaps == 0;
+ }
+
/**
* Wait until the shuffle finishes or until the timeout.
* @param millis maximum wait time
@@ -448,27 +486,27 @@ class ShuffleScheduler {
}
}
- void setFinishedTaskTrue(TezTaskID taskId) {
+ void setFinishedTaskTrue(int srcTaskIndex) {
synchronized(finishedMaps) {
- finishedMaps.put(taskId, new MutableInt(shuffle.getReduceRange()));
+ finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
}
}
- boolean incrementTaskCopyAndCheckCompletion(TezTaskID mapTaskId) {
+ boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
synchronized(finishedMaps) {
- MutableInt result = finishedMaps.get(mapTaskId);
+ MutableInt result = finishedMaps.get(srcTaskIndex);
if(result == null) {
result = new MutableInt(0);
- finishedMaps.put(mapTaskId, result);
+ finishedMaps.put(srcTaskIndex, result);
}
result.increment();
- return isFinishedTaskTrue(mapTaskId);
+ return isFinishedTaskTrue(srcTaskIndex);
}
}
- boolean isFinishedTaskTrue(TezTaskID taskId) {
+ boolean isFinishedTaskTrue(int srcTaskIndex) {
synchronized (finishedMaps) {
- MutableInt result = finishedMaps.get(taskId);
+ MutableInt result = finishedMaps.get(srcTaskIndex);
if(result == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..f77166d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class TaskAttemptIdentifier {
+
+ private final int taskIndex;
+ private final int attemptNumber;
+ private String pathComponent;
+
+ public TaskAttemptIdentifier(int taskIndex, int attemptNumber) {
+ this.taskIndex = taskIndex;
+ this.attemptNumber = attemptNumber;
+ }
+
+ public TaskAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+ this.taskIndex = taskIndex;
+ this.attemptNumber = attemptNumber;
+ this.pathComponent = pathComponent;
+ }
+
+ public int getTaskIndex() {
+ return taskIndex;
+ }
+
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ public String getPathComponent() {
+ return pathComponent;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + attemptNumber;
+ result = prime * result
+ + ((pathComponent == null) ? 0 : pathComponent.hashCode());
+ result = prime * result + taskIndex;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TaskAttemptIdentifier other = (TaskAttemptIdentifier) obj;
+ if (attemptNumber != other.attemptNumber)
+ return false;
+ if (pathComponent == null) {
+ if (other.pathComponent != null)
+ return false;
+ } else if (!pathComponent.equals(other.pathComponent))
+ return false;
+ if (taskIndex != other.taskIndex)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskAttemptIdentifier [taskIndex=" + taskIndex + ", attemptNumber="
+ + attemptNumber + ", pathComponent=" + pathComponent + "]";
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
index 0befaa8..f61670e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
@@ -63,13 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.RunningTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.common.security.SecureShuffleUtils;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -245,9 +246,9 @@ public class ShuffleHandler extends AuxiliaryService {
userRsrc.remove(appId.toString());
}
- public synchronized void init(Configuration conf, RunningTaskContext task) {
+ public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
this.init(new Configuration(conf));
- tokenSecret = task.getJobTokenSecret();
+ tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
}
@Override
@@ -441,14 +442,16 @@ public class ShuffleHandler extends AuxiliaryService {
for (String mapId : mapIds) {
try {
// TODO: Error handling - validate mapId via TezTaskAttemptId.forName
- if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
- String errorMessage =
- "Illegal shuffle request mapId: " + mapId
- + " while actual mapId is " + sorter.getTaskAttemptId();
- LOG.warn(errorMessage);
- sendError(ctx, errorMessage, BAD_REQUEST);
- return;
- }
+
+ // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
+// if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
+// String errorMessage =
+// "Illegal shuffle request mapId: " + mapId
+// + " while actual mapId is " + sorter.getTaskAttemptId();
+// LOG.warn(errorMessage);
+// sendError(ctx, errorMessage, BAD_REQUEST);
+// return;
+// }
lastMap =
sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index b90682e..4df1c01 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -21,6 +21,7 @@ package org.apache.tez.engine.common.sort.impl;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,47 +37,41 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.Constants;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.Partitioner;
import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
@SuppressWarnings({"unchecked", "rawtypes"})
public abstract class ExternalSorter {
private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
- public abstract void close() throws IOException, InterruptedException;
+ public abstract void close() throws IOException;
- public abstract void flush() throws IOException, InterruptedException;
+ public abstract void flush() throws IOException;
- public abstract void write(Object key, Object value) throws IOException,
- InterruptedException;
+ public abstract void write(Object key, Object value) throws IOException;
+ protected Progressable nullProgressable = new NullProgressable();
+ protected TezOutputContext outputContext;
protected Processor combineProcessor;
protected Partitioner partitioner;
- protected TezEngineTaskContext task;
- protected RunningTaskContext runningTaskContext;
- protected Configuration job;
+ protected Configuration conf;
protected FileSystem rfs;
protected TezTaskOutput mapOutputFile;
protected int partitions;
@@ -92,69 +87,68 @@ public abstract class ExternalSorter {
// Compression for map-outputs
protected CompressionCodec codec;
+ // TODO NEWTEZ Setup CombineProcessor
+ // TODO NEWTEZ Setup Partitioner in SimpleOutput
+
// Counters
+ // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
protected TezCounter mapOutputByteCounter;
protected TezCounter mapOutputRecordCounter;
protected TezCounter fileOutputByteCounter;
protected TezCounter spilledRecordsCounter;
- protected Progress sortPhase;
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ this.outputContext = outputContext;
+ this.conf = conf;
+ this.partitions = numOutputs;
- this.job = conf;
- LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " +
- job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
-
- partitions = task.getOutputSpecList().get(0).getNumOutputs();
-// partitions =
-// job.getInt(
-// TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
-// TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
- rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
+ rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
// sorter
- sorter = ReflectionUtils.newInstance(job.getClass(
+ sorter = ReflectionUtils.newInstance(this.conf.getClass(
TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
- IndexedSorter.class), job);
+ IndexedSorter.class), this.conf);
- comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
+ comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
// k/v serialization
- keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
- valClass = ConfigUtils.getIntermediateOutputValueClass(job);
- serializationFactory = new SerializationFactory(job);
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+ serializationFactory = new SerializationFactory(this.conf);
keySerializer = serializationFactory.getSerializer(keyClass);
valSerializer = serializationFactory.getSerializer(valClass);
// counters
mapOutputByteCounter =
- runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+ outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
- runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+ outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter =
- runningTaskContext.getTaskReporter().
- getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
+ outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
spilledRecordsCounter =
- runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+ outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
// compression
- if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
+ if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateOutputCompressorClass(job, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
+ ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, this.conf);
} else {
codec = null;
}
// Task outputs
- mapOutputFile =
- (TezTaskOutput) ReflectionUtils.newInstance(
- conf.getClass(
- Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
- TezTaskOutputFiles.class), conf);
-
- // sortPhase
- sortPhase = runningTaskContext.getProgress().addPhase("sort");
+ mapOutputFile = instantiateTaskOutputManager(this.conf, outputContext);
+ }
+
+ // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
+ // Alternately add a config key with a classname, which is easy to initialize.
+ public void setCombiner(Processor combineProcessor) {
+ this.combineProcessor = combineProcessor;
+ }
+
+ // TODO NEWTEZ Setup a config value for the Partitioner class, from where it can be initialized.
+ public void setPartitioner(Partitioner partitioner) {
+ this.partitioner = partitioner;
}
/**
@@ -168,42 +162,33 @@ public abstract class ExternalSorter {
}
}
- public void setTask(RunningTaskContext task) {
- this.runningTaskContext = task;
- this.combineProcessor = task.getCombineProcessor();
- this.partitioner = task.getPartitioner();
- }
-
- public TezTaskAttemptID getTaskAttemptId() {
- return task.getTaskAttemptId();
- }
-
@Private
public TezTaskOutput getMapOutput() {
return mapOutputFile;
}
protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
- Writer writer) throws IOException, InterruptedException {
-
- CombineInput combineIn = new CombineInput(kvIter);
- combineIn.initialize(job, runningTaskContext.getTaskReporter());
-
- CombineOutput combineOut = new CombineOutput(writer);
- combineOut.initialize(job, runningTaskContext.getTaskReporter());
-
- try {
- combineProcessor.process(new Input[] {combineIn},
- new Output[] {combineOut});
- } catch (IOException ioe) {
- try {
- combineProcessor.close();
- } catch (IOException ignored) {}
-
- // Do not close output here as the sorter should close the combine output
-
- throw ioe;
- }
+ Writer writer) throws IOException {
+
+ // TODO NEWTEZ Fix Combiner.
+// CombineInput combineIn = new CombineInput(kvIter);
+// combineIn.initialize(job, runningTaskContext.getTaskReporter());
+//
+// CombineOutput combineOut = new CombineOutput(writer);
+// combineOut.initialize(job, runningTaskContext.getTaskReporter());
+//
+// try {
+// combineProcessor.process(new Input[] {combineIn},
+// new Output[] {combineOut});
+// } catch (IOException ioe) {
+// try {
+// combineProcessor.close();
+// } catch (IOException ignored) {}
+//
+// // Do not close output here as the sorter should close the combine output
+//
+// throw ioe;
+// }
}
@@ -228,10 +213,6 @@ public abstract class ExternalSorter {
}
}
- public ExternalSorter(TezEngineTaskContext tezEngineTask) {
- this.task = tezEngineTask;
- }
-
public InputStream getSortedStream(int partition) {
throw new UnsupportedOperationException("getSortedStream isn't supported!");
}
@@ -243,4 +224,23 @@ public abstract class ExternalSorter {
public OutputContext getOutputContext() {
return null;
}
+
+
+
+ private TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+ Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class);
+ try {
+ Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+ ctor.setAccessible(true);
+ TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+ return instance;
+ } catch (Exception e) {
+ throw new TezUncheckedException(
+ "Unable to instantiate configured TezOutputFileManager: "
+ + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+ TezTaskOutputFiles.class.getName()), e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 00b8958..3b39900 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -22,8 +22,6 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DataChecksum;
[2/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs
(part of TEZ-398). (sseth)
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
index 4da5ffa..bafbd4d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
@@ -43,18 +43,15 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.records.OutputContext;
@SuppressWarnings({"unchecked", "rawtypes"})
-public class PipelinedSorter extends ExternalSorter implements SortingOutput {
+public class PipelinedSorter extends ExternalSorter {
private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
@@ -92,32 +89,21 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
-
- public PipelinedSorter(TezEngineTaskContext task) throws IOException {
- super(task);
- }
-
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
-
- if (task == null) {
- LOG.info("Bailing!", new IOException());
- return;
- }
- super.initialize(conf, master);
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
partitionBits = bitcount(partitions)+1;
//sanity checks
final float spillper =
- job.getFloat(
+ this.conf.getFloat(
TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
final int sortmb =
- job.getInt(
+ this.conf.getInt(
TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
- indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
@@ -137,7 +123,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
span = new SortSpan(largeBuffer, 1024*1024, 16);
merger = new SpanMerger(comparator);
final int sortThreads =
- job.getInt(
+ this.conf.getInt(
TezJobConfig.TEZ_ENGINE_SORT_THREADS,
TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
sortmaster = Executors.newFixedThreadPool(sortThreads);
@@ -151,7 +137,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
}
valSerializer.open(span.out);
keySerializer.open(span.out);
- minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+ minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
}
private int bitcount(int n) {
@@ -193,8 +179,9 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
keySerializer.open(span.out);
}
+ @Override
public void write(Object key, Object value)
- throws IOException, InterruptedException {
+ throws IOException {
collect(
key, value, partitioner.getPartition(key, value, partitions));
}
@@ -206,7 +193,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- runningTaskContext.getTaskReporter().progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
@@ -262,7 +248,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
}
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend - keystart);
- runningTaskContext.getTaskReporter().progress();
}
public void spill() throws IOException {
@@ -282,7 +267,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
//write merged output to disk
long segmentStart = out.getPos();
Writer writer =
- new Writer(job, out, keyClass, valClass, codec,
+ new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
if (combineProcessor == null) {
@@ -308,7 +293,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
// TODO: cache
- spillRec.writeToFile(indexFilename, job);
+ spillRec.writeToFile(indexFilename, conf);
++numSpills;
} catch(InterruptedException ie) {
// TODO:the combiner has been interrupted
@@ -318,8 +303,8 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
}
@Override
- public void flush() throws IOException, InterruptedException {
- final TezTaskAttemptID mapId = task.getTaskAttemptId();
+ public void flush() throws IOException {
+ final String uniqueIdentifier = outputContext.getUniqueIdentifier();
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(0); //TODO
Path finalIndexFile =
@@ -347,8 +332,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
- sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+
TezMerger.considerFinalMergeForProgress();
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
@@ -357,7 +341,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
for(int i = 0; i < numSpills; i++) {
// TODO: build this cache before
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
- TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, job);
+ TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
indexCacheList.add(spillIndex);
}
@@ -370,34 +354,34 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
- new Segment(job, rfs, spillFilename, indexRecord.getStartOffset(),
+ new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, true);
segmentList.add(i, s);
}
int mergeFactor =
- job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
+ this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
- @SuppressWarnings("unchecked")
- TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+ TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
- new Path(mapId.toString()),
- (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
- runningTaskContext.getTaskReporter(), sortSegments,
- null, spilledRecordsCounter, sortPhase.phase());
+ new Path(uniqueIdentifier),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+ nullProgressable, sortSegments,
+ null, spilledRecordsCounter,
+ null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer writer =
- new Writer(job, finalOut, keyClass, valClass, codec,
+ new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
writer.setRLE(merger.needsRLE());
if (combineProcessor == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
+ TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
} else {
runCombineProcessor(kvIter, writer);
}
@@ -405,8 +389,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
//close
writer.close();
- sortPhase.startNextPhase();
-
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
@@ -416,7 +398,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
spillRec.putIndex(rec, parts);
}
- spillRec.writeToFile(finalIndexFile, job);
+ spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
Path indexFilename = mapOutputFile.getSpillIndexFile(i);
@@ -520,7 +502,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
kj = new byte[keymax];
LOG.info("begin sorting Span"+index + " ("+length()+")");
if(length() > 1) {
- sorter.sort(this, 0, length(), runningTaskContext.getTaskReporter());
+ sorter.sort(this, 0, length(), nullProgressable);
}
LOG.info("done sorting Span"+index);
return new SpanIterator(this);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
index eeea764..7815569 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
@@ -55,6 +55,7 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer;
public class TezMerger {
private static final Log LOG = LogFactory.getLog(TezMerger.class);
+
// Local directories
private static LocalDirAllocator lDirAlloc =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index adbff22..b1e17e7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -39,11 +39,7 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.ExternalSorter;
import org.apache.tez.engine.common.sort.impl.IFile;
@@ -53,13 +49,15 @@ import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
import org.apache.tez.engine.common.sort.impl.IFile.Writer;
import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
@SuppressWarnings({"unchecked", "rawtypes"})
public class DefaultSorter extends ExternalSorter implements IndexedSortable {
-
+
private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
+ // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
+
/**
* The size of each record in the index file for the map-outputs.
*/
@@ -112,26 +110,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
private int totalIndexCacheMemory;
private int indexCacheMemoryLimit;
- public DefaultSorter(TezTaskContext task) throws IOException {
- // Does this assisted inject work ?
- super((TezEngineTaskContext)task);
- }
-
@Override
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- if (task == null) {
- LOG.info("Bailing!", new IOException());
- return;
- }
-
- super.initialize(conf, master);
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
// sanity checks
- final float spillper = job.getFloat(
+ final float spillper = this.conf.getFloat(
TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
- final int sortmb = job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
+ final int sortmb = this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
throw new IOException("Invalid \""
@@ -142,7 +129,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+ "\": " + sortmb);
}
- indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
// buffers and accounting
@@ -172,7 +159,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
keySerializer.open(bb);
spillInProgress = false;
- minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+ minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
@@ -194,7 +181,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
@Override
public void write(Object key, Object value)
- throws IOException, InterruptedException {
+ throws IOException {
collect(
key, value, partitioner.getPartition(key, value, partitions));
}
@@ -206,7 +193,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
*/
synchronized void collect(Object key, Object value, final int partition
) throws IOException {
- runningTaskContext.getTaskReporter().progress();
+
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
@@ -571,7 +558,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
// wait for spill
try {
while (spillInProgress) {
- runningTaskContext.getTaskReporter().progress();
spillDone.await();
}
} catch (InterruptedException e) {
@@ -598,12 +584,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
@Override
- public void flush() throws IOException, InterruptedException {
+ public void flush() throws IOException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
while (spillInProgress) {
- runningTaskContext.getTaskReporter().progress();
spillDone.await();
}
checkSpillException();
@@ -655,7 +640,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
@Override
- public void close() throws IOException, InterruptedException { }
+ public void close() throws IOException { }
protected class SpillThread extends Thread {
@@ -698,10 +683,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
final Throwable lspillException = sortSpillException;
if (lspillException != null) {
if (lspillException instanceof Error) {
- final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
- StringUtils.stringifyException(lspillException);
- runningTaskContext.getTaskReporter().reportFatalError(
- task.getTaskAttemptId(), lspillException, logMsg);
+ final String logMsg = "Task " + outputContext.getUniqueIdentifier()
+ + " failed : " + StringUtils.stringifyException(lspillException);
+ outputContext.fatalError(lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
}
@@ -739,7 +723,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
throws IOException, InterruptedException {
final int mstart = getMetaStart();
final int mend = getMetaEnd();
- sorter.sort(this, mstart, mend, runningTaskContext.getTaskReporter());
+ sorter.sort(this, mstart, mend, nullProgressable);
spill(mstart, mend);
}
@@ -766,7 +750,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
- writer = new Writer(job, out, keyClass, valClass, codec,
+ writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (combineProcessor == null) {
// spill directly
@@ -824,7 +808,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
+ spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
@@ -859,7 +843,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- writer = new IFile.Writer(job, out, keyClass, valClass, codec,
+ writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
@@ -890,7 +874,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
+ spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
@@ -988,12 +972,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
public void close() { }
}
- private void mergeParts() throws IOException, InterruptedException {
+ private void mergeParts() throws IOException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
- final TezTaskAttemptID mapId = task.getTaskAttemptId();
+ final String taskIdentifier = outputContext.getUniqueIdentifier();
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
@@ -1007,16 +991,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
} else {
indexCacheList.get(0).writeToFile(
- mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
}
- sortPhase.complete();
return;
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
- indexCacheList.add(new TezSpillRecord(indexFileName, job));
+ indexCacheList.add(new TezSpillRecord(indexFileName, conf));
}
//make correction in the length to include the sequence file header
@@ -1039,7 +1022,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
Writer writer =
- new Writer(job, finalOut, keyClass, valClass, codec, null);
+ new Writer(conf, finalOut, keyClass, valClass, codec, null);
writer.close();
TezIndexRecord rec =
@@ -1049,15 +1032,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
writer.getCompressedLength());
sr.putIndex(rec, i);
}
- sr.writeToFile(finalIndexFile, job);
+ sr.writeToFile(finalIndexFile, conf);
} finally {
finalOut.close();
}
- sortPhase.complete();
return;
}
else {
- sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
TezMerger.considerFinalMergeForProgress();
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
@@ -1069,12 +1050,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment s =
- new Segment(job, rfs, filename[i], indexRecord.getStartOffset(),
+ new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
indexRecord.getPartLength(), codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
- LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+ LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
"Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
indexRecord.getRawLength() + ", " +
indexRecord.getPartLength() + ")");
@@ -1082,34 +1063,33 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
int mergeFactor =
- job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
+ this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
- TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+ TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
- new Path(mapId.toString()),
- (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
- runningTaskContext.getTaskReporter(), sortSegments,
+ new Path(taskIdentifier),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+ nullProgressable, sortSegments,
null, spilledRecordsCounter,
- sortPhase.phase());
+ null); // Not using any Progress in TezMerger. Should just work.
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer writer =
- new Writer(job, finalOut, keyClass, valClass, codec,
+ new Writer(conf, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combineProcessor == null || numSpills < minSpillsForCombine) {
TezMerger.writeFile(kvIter, writer,
- runningTaskContext.getTaskReporter(), job);
+ nullProgressable, conf);
} else {
runCombineProcessor(kvIter, writer);
}
writer.close();
- sortPhase.startNextPhase();
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
@@ -1118,17 +1098,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
writer.getCompressedLength());
spillRec.putIndex(rec, parts);
}
- spillRec.writeToFile(finalIndexFile, job);
+ spillRec.writeToFile(finalIndexFile, conf);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
-
- @Override
- public OutputContext getOutputContext() {
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
index 6af6dab..53e6003 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
@@ -29,12 +29,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.engine.api.Master;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
public class InMemoryShuffleSorter extends DefaultSorter {
@@ -51,16 +49,11 @@ public class InMemoryShuffleSorter extends DefaultSorter {
byte[] kvbuffer;
IntBuffer kvmeta;
-
- public InMemoryShuffleSorter(TezTaskContext task) throws IOException {
- super(task);
- }
@Override
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- super.initialize(conf, master);
- shuffleHandler.init(conf, runningTaskContext);
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
+ shuffleHandler.initialize(outputContext, conf);
}
@Override
@@ -98,7 +91,7 @@ public class InMemoryShuffleSorter extends DefaultSorter {
shuffleHeaders.add(
new ShuffleHeader(
- task.getTaskAttemptId().toString(),
+ outputContext.getUniqueIdentifier(), // TODO Verify that this is correct.
length + IFILE_CHECKSUM_LENGTH, length, i)
);
LOG.info("shuffleHeader[" + i + "]:" +
@@ -116,7 +109,7 @@ public class InMemoryShuffleSorter extends DefaultSorter {
}
@Override
- public void close() throws IOException, InterruptedException{
+ public void close() throws IOException {
// FIXME
//shuffleHandler.stop();
}
@@ -130,9 +123,4 @@ public class InMemoryShuffleSorter extends DefaultSorter {
return spillIndices.get(partition);
}
- @Override
- public OutputContext getOutputContext() {
- return new OutputContext(shuffleHandler.getPort());
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
index 47e6234..d74e159 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
-class SortBufferInputStream extends InputStream {
+ public class SortBufferInputStream extends InputStream {
private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
new file mode 100644
index 0000000..7f165eb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+ public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
+
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ @Override
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+ + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ @Override
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ return new Path(existing.getParent(),
+ Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out", size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+ + spillNumber + ".out.index", size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFile(TaskAttemptIdentifier mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
+ Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getTaskIndex())), conf);
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ @Override
+ public Path getInputFileForWrite(int taskId,
+ long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ @Override
+ public void removeAll()
+ throws IOException {
+ deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
+ }
+
+ private String[] getLocalDirs() throws IOException {
+ return conf.getStrings(TezJobConfig.LOCAL_DIRS);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void deleteLocalFiles(String subdir) throws IOException {
+ String[] localDirs = getLocalDirs();
+ for (int i = 0; i < localDirs.length; i++) {
+ FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
new file mode 100644
index 0000000..6779d32
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput {
+
+ protected Configuration conf;
+ protected String uniqueId;
+
+ public TezTaskOutput(Configuration conf, String uniqueId) {
+ this.conf = conf;
+ this.uniqueId = uniqueId;
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFile() throws IOException;
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFile() throws IOException;
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException;
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param attemptIdentifier The identifier for the source task
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFile(TaskAttemptIdentifier attemptIdentifier) throws IOException;
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param taskIdentifier The identifier for the source task
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public abstract Path getInputFileForWrite(
+ int taskIdentifier, long size) throws IOException;
+
+ /** Removes all of the files related to a task. */
+ public abstract void removeAll() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
new file mode 100644
index 0000000..ae9c6d8
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+
+ public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+ super(conf, uniqueId);
+ }
+
+ private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
+
+ private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+ private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+ + ".index";
+
+
+
+ // assume configured to $localdir/usercache/$user/appcache/$appId
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+ private Path getAttemptOutputDir() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAttemptOutputDir: "
+ + Constants.TASK_OUTPUT_DIR + "/"
+ + uniqueId);
+ }
+ return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite(long size) throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
+ return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile() throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFileForWrite(long size) throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir, uniqueId);
+ return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+ Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_FILE_PATTERN,
+ uniqueId, spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(String.format(SPILL_FILE_PATTERN,
+ uniqueId, spillNumber)), size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ uniqueId, spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ uniqueId, spillNumber), size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(TaskAttemptIdentifier attemptIdentifier) throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFileForWrite(int srcTaskId,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ public void removeAll() throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
index eb1fe5f..50d270b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
@@ -40,7 +40,8 @@ import org.apache.tez.dag.records.TezTaskID;
@InterfaceStability.Unstable
public abstract class TezTaskOutput implements Configurable {
- private Configuration conf;
+ protected Configuration conf;
+ protected String uniqueId;
public TezTaskOutput() {
}
@@ -152,6 +153,14 @@ public abstract class TezTaskOutput implements Configurable {
/** Removes all of the files related to a task. */
public abstract void removeAll() throws IOException;
+ public void setUniqueIdentifier(String uniqueId) {
+ this.uniqueId = uniqueId;
+ }
+
+ public String getUniqueIdentifier() {
+ return this.uniqueId;
+ }
+
@Override
public void setConf(Configuration conf) {
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
new file mode 100644
index 0000000..5071dd2
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.hadoop.compat;
+
+import org.apache.hadoop.util.Progressable;
+
+public class NullProgressable implements Progressable {
+
+ public NullProgressable() {
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void progress() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
index 6b1eb10..8ae6bfe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
@@ -18,65 +18,39 @@
package org.apache.tez.engine.lib.input;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.engine.common.localshuffle.LocalShuffle;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
/**
- * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
* sorted data, merges them and provides key/<values> to the consumer.
*/
public class LocalMergedInput extends ShuffledMergedInput {
- TezRawKeyValueIterator rIter = null;
-
- private Configuration conf;
- private CombineInput raw;
- public LocalMergedInput(TezEngineTaskContext task, int index) {
- super(task, index);
- }
-
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- this.conf = conf;
-
- LocalShuffle shuffle =
- new LocalShuffle(task, runningTaskContext, this.conf, (TezTaskReporter)master);
- rIter = shuffle.run();
- raw = new CombineInput(rIter);
- }
-
- public boolean hasNext() throws IOException, InterruptedException {
- return raw.hasNext();
- }
+ // TODO NEWTEZ Fix CombineProcessor
+ //private CombineInput raw;
- public Object getNextKey() throws IOException, InterruptedException {
- return raw.getNextKey();
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Iterable getNextValues()
- throws IOException, InterruptedException {
- return raw.getNextValues();
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return raw.getProgress();
- }
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
- public void close() throws IOException {
- raw.close();
+ LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
+ // TODO NEWTEZ async run and checkIfComplete methods
+ rawIter = localShuffle.run();
+ return Collections.emptyList();
}
- public TezRawKeyValueIterator getIterator() {
- return rIter;
+ @Override
+ public List<Event> close() throws IOException {
+ rawIter.close();
+ return Collections.emptyList();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index b11e009..fa7054a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -18,90 +18,160 @@
package org.apache.tez.engine.lib.input;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.ValuesIterator;
import org.apache.tez.engine.common.shuffle.impl.Shuffle;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
/**
- * {@link ShuffledMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer.
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ *
*/
-public class ShuffledMergedInput implements Input {
+public class ShuffledMergedInput implements LogicalInput {
static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
- TezRawKeyValueIterator rIter = null;
- protected TezEngineTaskContext task;
- protected int index;
- protected RunningTaskContext runningTaskContext;
+ protected TezInputContext inputContext;
+ protected TezRawKeyValueIterator rawIter = null;
+ protected Configuration conf;
+ protected int numInputs = 0;
+ protected Shuffle shuffle;
+ @SuppressWarnings("rawtypes")
+ protected ValuesIterator vIter;
- private Configuration conf;
- private CombineInput raw;
- private int taskIndegree = 0;
-
- public ShuffledMergedInput(TezEngineTaskContext task, int index) {
- this.task = task;
- this.index = index;
- this.taskIndegree = this.task.getInputSpecList().get(this.index)
- .getNumInputs();
- }
-
- public void mergeWith(ShuffledMergedInput other) {
- this.taskIndegree += other.taskIndegree;
- }
-
- public void setTask(RunningTaskContext runningTaskContext) {
- this.runningTaskContext = runningTaskContext;
- }
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
- public void initialize(Configuration conf, Master master) throws IOException,
- InterruptedException {
- this.conf = conf;
-
- Shuffle shuffle =
- new Shuffle(
- task, runningTaskContext, this.conf,
- taskIndegree,
- (TezTaskReporter)master,
- runningTaskContext.getCombineProcessor());
- rIter = shuffle.run();
+ @Override
+ public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
+ this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
- raw = new CombineInput(rIter);
+ this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+
+ // Start the shuffle - copy and merge.
+ shuffle = new Shuffle(inputContext, this.conf, numInputs);
+ shuffle.run();
+
+ return Collections.emptyList();
}
- public boolean hasNext() throws IOException, InterruptedException {
- return raw.hasNext();
+ /**
+ * Check if the input is ready for consumption
+ *
+ * @return true if the input is ready for consumption, or if an error occurred
+ * processing fetching the input. false if the shuffle and merge are
+ * still in progress
+ */
+ public boolean isInputReady() {
+ return shuffle.isInputReady();
}
- public Object getNextKey() throws IOException, InterruptedException {
- return raw.getNextKey();
+ /**
+ * Waits for the input to become ready for consumption
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void waitForInputReady() throws IOException, InterruptedException {
+ rawIter = shuffle.waitForInput();
+ createValuesIteartor();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Iterable getNextValues()
- throws IOException, InterruptedException {
- return raw.getNextValues();
+ @Override
+ public List<Event> close() throws IOException {
+ rawIter.close();
+ return Collections.emptyList();
}
- public float getProgress() throws IOException, InterruptedException {
- return raw.getProgress();
+ /**
+ * Get a KVReader for the Input.</p> This method will block until the input is
+ * ready - i.e. the copy and merge stages are complete. Users can use the
+ * isInputReady method to check if the input is ready, which gives an
+ * indication of whether this method will block or not.
+ *
+ * NOTE: All values for the current K-V pair must be read prior to invoking
+ * moveToNext. Once moveToNext() is called, the valueIterator from the
+ * previous K-V pair will throw an Exception
+ *
+ * @return a KVReader over the sorted input.
+ */
+ @Override
+ public KVReader getReader() throws IOException {
+ if (rawIter != null) {
+ try {
+ waitForInputReady();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for input ready", e);
+ }
+ }
+ return new KVReader() {
+
+ @Override
+ public boolean moveToNext() throws IOException {
+ return vIter.moveToNext();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KVRecord getCurrentKV() {
+ return new KVRecord(vIter.getKey(), vIter.getValues());
+ }
+ };
}
- public void close() throws IOException {
- raw.close();
+ @Override
+ public void handleEvents(List<Event> inputEvents) {
+ shuffle.handleEvents(inputEvents);
}
- public TezRawKeyValueIterator getIterator() {
- return rIter;
+ @Override
+ public void setNumPhysicalInputs(int numInputs) {
+ this.numInputs = numInputs;
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void createValuesIteartor()
+ throws IOException {
+ vIter = new ValuesIterator(rawIter,
+ (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
+ ConfigUtils.getIntermediateInputKeyClass(conf),
+ ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+
+ }
+
+
+ // This functionality is currently broken. If there's inputs which need to be
+ // written to disk, there's a possibility that inputs from the different
+ // sources could clobber each others' output. Also the current structures do
+ // not have adequate information to de-dupe these (vertex name)
+// public void mergeWith(ShuffledMergedInput other) {
+// this.numInputs += other.getNumPhysicalInputs();
+// }
+//
+// public int getNumPhysicalInputs() {
+// return this.numInputs;
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
new file mode 100644
index 0000000..269fe81
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldinput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer.
+ */
+public class LocalMergedInput extends OldShuffledMergedInput {
+
+ public LocalMergedInput(TezEngineTaskContext task, int index) {
+ super(task, index);
+ }
+
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ }
+
+ public boolean hasNext() throws IOException, InterruptedException {
+ return false;
+ }
+
+ public Object getNextKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Iterable getNextValues()
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return 0f;
+ }
+
+ public void close() throws IOException {
+ }
+
+ public TezRawKeyValueIterator getIterator() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
new file mode 100644
index 0000000..c046a27
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldinput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * {@link OldShuffledMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer.
+ */
+public class OldShuffledMergedInput implements Input {
+
+
+ public OldShuffledMergedInput(TezEngineTaskContext task, int index) {
+ }
+
+ public void mergeWith(OldShuffledMergedInput other) {
+ }
+
+ public void setTask(RunningTaskContext runningTaskContext) {
+ }
+
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ }
+
+ public boolean hasNext() throws IOException, InterruptedException {
+ return false;
+ }
+
+ public Object getNextKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Iterable getNextValues()
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return 0f;
+ }
+
+ public void close() throws IOException {
+ }
+
+ public TezRawKeyValueIterator getIterator() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
new file mode 100644
index 0000000..9ac92ba
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.records.OutputContext;
+
+/**
+ * {@link OldInMemorySortedOutput} is an {@link Output} which sorts key/value pairs
+ * written to it and persists it to a file.
+ */
+public class OldInMemorySortedOutput implements SortingOutput {
+
+ public OldInMemorySortedOutput(TezEngineTaskContext task) throws IOException {
+ }
+
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ }
+
+ public void setTask(RunningTaskContext task) {
+ }
+
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ }
+
+ public void close() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
new file mode 100644
index 0000000..b7f913c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezEngineTaskContext;
+
+public class OldLocalOnFileSorterOutput extends OldOnFileSortedOutput {
+
+ private static final Log LOG = LogFactory.getLog(OldLocalOnFileSorterOutput.class);
+
+ public OldLocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
+ super(task);
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
new file mode 100644
index 0000000..f259df9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.records.OutputContext;
+
+/**
+ * {@link OldOnFileSortedOutput} is an {@link Output} which sorts key/value pairs
+ * written to it and persists it to a file.
+ */
+public class OldOnFileSortedOutput implements SortingOutput {
+
+ public OldOnFileSortedOutput(TezEngineTaskContext task) throws IOException {
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public void setTask(RunningTaskContext task) {
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ }
+
+ @Override
+ public OutputContext getOutputContext() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
index 99486be..5d2a2ba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
@@ -18,53 +18,64 @@
package org.apache.tez.engine.lib.output;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.Output;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
/**
* {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs
* written to it and persists it to a file.
*/
-public class InMemorySortedOutput implements SortingOutput {
+public class InMemorySortedOutput implements LogicalOutput {
protected InMemoryShuffleSorter sorter;
+ protected int numTasks;
+ protected TezOutputContext outputContext;
- public InMemorySortedOutput(TezEngineTaskContext task) throws IOException {
- sorter = new InMemoryShuffleSorter(task);
- }
-
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException {
- sorter.initialize(conf, master);
- }
- public void setTask(RunningTaskContext task) {
- sorter.setTask(task);
- }
-
- public void write(Object key, Object value) throws IOException,
- InterruptedException {
- sorter.write(key, value);
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException {
+ this.outputContext = outputContext;
+ this.sorter = new InMemoryShuffleSorter();
+ sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
+ return Collections.emptyList();
}
- public void close() throws IOException, InterruptedException {
- sorter.flush();
- sorter.close();
+ @Override
+ public Writer getWriter() throws IOException {
+ return new KVWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ sorter.write(key, value);
+ }
+ };
}
@Override
- public OutputContext getOutputContext() {
- return sorter.getOutputContext();
+ public void handleEvents(List<Event> outputEvents) {
+ // No events expected.
}
- public InMemoryShuffleSorter getSorter() {
- return sorter;
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ this.numTasks = numOutputs;
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ sorter.flush();
+ sorter.close();
+ // TODO NEWTEZ Event generation
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index a954f6e..d23ac1e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -19,39 +19,40 @@
package org.apache.tez.engine.lib.output;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.Event;
public class LocalOnFileSorterOutput extends OnFileSortedOutput {
private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
- public LocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
- super(task);
- }
+
@Override
- public void close() throws IOException, InterruptedException {
+ public List<Event> close() throws IOException {
LOG.debug("Closing LocalOnFileSorterOutput");
super.close();
TezTaskOutput mapOutputFile = sorter.getMapOutput();
- FileSystem localFs = FileSystem.getLocal(mapOutputFile.getConf());
+ FileSystem localFs = FileSystem.getLocal(conf);
Path src = mapOutputFile.getOutputFile();
Path dst =
mapOutputFile.getInputFileForWrite(
- sorter.getTaskAttemptId().getTaskID(),
+ outputContext.getTaskIndex(),
localFs.getFileStatus(src).getLen());
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming src = " + src + ", dst = " + dst);
}
localFs.rename(src, dst);
+ // TODO NEWTEZ Event generation.
+ return null;
}
}
[4/4] git commit: TEZ-417. Change Shuffle Input/Output to work with
the new APIs (part of TEZ-398). (sseth)
Posted by ss...@apache.org.
TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cf7f197
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cf7f197
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cf7f197
Branch: refs/heads/TEZ-398
Commit: 1cf7f197dd71d5a01bb457bad1f3c79e93a3afbf
Parents: e5919fa
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 10 21:49:10 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 10 21:49:10 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/common/Constants.java | 3 +
.../java/org/apache/tez/common/IDUtils.java | 3 +-
.../org/apache/tez/common/TezJobConfig.java | 1 +
.../apache/tez/common/counters/TaskCounter.java | 1 +
.../org/apache/tez/engine/api/Partitioner.java | 35 --
.../tez/engine/newapi/TezTaskContext.java | 61 +++-
.../org/apache/tez/engine/api/Partitioner.java | 35 ++
.../tez/engine/common/TezEngineUtils.java | 39 +++
.../tez/engine/common/ValuesIterator.java | 192 +++++++++++
.../common/localshuffle/LocalShuffle.java | 52 ++-
.../common/security/DelegationTokenRenewal.java | 318 -------------------
.../common/shuffle/impl/EventFetcher.java | 212 -------------
.../tez/engine/common/shuffle/impl/Fetcher.java | 121 ++++---
.../common/shuffle/impl/InMemoryReader.java | 6 +-
.../tez/engine/common/shuffle/impl/MapHost.java | 18 +-
.../engine/common/shuffle/impl/MapOutput.java | 28 +-
.../common/shuffle/impl/MergeManager.java | 172 +++++-----
.../tez/engine/common/shuffle/impl/Shuffle.java | 281 ++++++++--------
.../shuffle/impl/ShuffleClientMetrics.java | 16 +-
.../shuffle/impl/ShuffleInputEventHandler.java | 132 ++++++++
.../common/shuffle/impl/ShuffleScheduler.java | 194 ++++++-----
.../shuffle/impl/TaskAttemptIdentifier.java | 95 ++++++
.../common/shuffle/server/ShuffleHandler.java | 25 +-
.../engine/common/sort/impl/ExternalSorter.java | 176 +++++-----
.../common/sort/impl/IFileOutputStream.java | 2 -
.../common/sort/impl/PipelinedSorter.java | 78 ++---
.../tez/engine/common/sort/impl/TezMerger.java | 1 +
.../common/sort/impl/dflt/DefaultSorter.java | 104 +++---
.../sort/impl/dflt/InMemoryShuffleSorter.java | 24 +-
.../sort/impl/dflt/SortBufferInputStream.java | 2 +-
.../newoutput/TezLocalTaskOutputFiles.java | 234 ++++++++++++++
.../task/local/newoutput/TezTaskOutput.java | 156 +++++++++
.../local/newoutput/TezTaskOutputFiles.java | 232 ++++++++++++++
.../common/task/local/output/TezTaskOutput.java | 11 +-
.../engine/hadoop/compat/NullProgressable.java | 33 ++
.../tez/engine/lib/input/LocalMergedInput.java | 68 ++--
.../engine/lib/input/ShuffledMergedInput.java | 184 +++++++----
.../engine/lib/oldinput/LocalMergedInput.java | 67 ++++
.../lib/oldinput/OldShuffledMergedInput.java | 74 +++++
.../lib/oldoutput/OldInMemorySortedOutput.java | 58 ++++
.../oldoutput/OldLocalOnFileSorterOutput.java | 38 +++
.../lib/oldoutput/OldOnFileSortedOutput.java | 62 ++++
.../engine/lib/output/InMemorySortedOutput.java | 71 +++--
.../lib/output/LocalOnFileSorterOutput.java | 17 +-
.../engine/lib/output/OnFileSortedOutput.java | 103 ++++--
.../org/apache/tez/engine/newapi/KVReader.java | 52 ++-
.../org/apache/tez/engine/newapi/KVWriter.java | 2 +-
.../engine/newapi/impl/TezInputContextImpl.java | 4 +-
.../newapi/impl/TezOutputContextImpl.java | 3 +
.../newapi/impl/TezProcessorContextImpl.java | 3 +
.../engine/newapi/impl/TezTaskContextImpl.java | 59 +++-
.../tez/engine/shuffle/common/ShuffleUtils.java | 56 ++++
.../tez/mapreduce/examples/MRRSleepJob.java | 8 +-
.../mapreduce/examples/OrderedWordCount.java | 8 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 20 +-
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 6 +-
.../processor/reduce/ReduceProcessor.java | 24 +-
.../processor/map/TestMapProcessor.java | 154 +++++----
.../processor/reduce/TestReduceProcessor.java | 6 +-
.../org/apache/tez/mapreduce/YARNRunner.java | 8 +-
60 files changed, 2708 insertions(+), 1540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/Constants.java b/tez-common/src/main/java/org/apache/tez/common/Constants.java
index 9f1b20a..8ea2909 100644
--- a/tez-common/src/main/java/org/apache/tez/common/Constants.java
+++ b/tez-common/src/main/java/org/apache/tez/common/Constants.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
public class Constants {
+ // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
+
public static final String TEZ = "tez";
public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
@@ -31,6 +33,7 @@ public class Constants {
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
public static String MERGED_OUTPUT_PREFIX = ".merged";
+ // TODO NEWTEZ Remove this constant once the old code is removed.
public static final String TEZ_ENGINE_TASK_ATTEMPT_ID =
"tez.engine.task.attempt.id";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
index e94d939..1270e5a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
@@ -56,7 +56,7 @@ public class IDUtils {
}
throw new IllegalArgumentException(exceptionMsg);
}
-
+
/** Construct a TaskAttemptID object from given string
* @return constructed TaskAttemptID object or null if the given String is null
* @throws IllegalArgumentException if the given string is malformed
@@ -89,5 +89,4 @@ public class IDUtils {
}
throw new IllegalArgumentException(exceptionMsg);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 5a847f1..7d8730e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -157,6 +157,7 @@ public class TezJobConfig {
"tez.engine.shuffle.use.in-memory";
public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
+ // TODO NEWTEZ Remove these config parameters. Will be part of an event.
@Private
public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE =
"tez.engine.shuffle.partition-range";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 60ad1c9..b6fca27 100644
--- a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum TaskCounter {
+ // TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
deleted file mode 100644
index cbef463..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-/**
- * {@link Partitioner} is used by the TEZ framework to partition
- * output key/value pairs.
- */
-public interface Partitioner {
-
- /**
- * Get partition for given key/value.
- * @param key key
- * @param value value
- * @param numPartitions number of partitions
- * @return
- */
- int getPartition(Object key, Object value, int numPartitions);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 4cc5668..341377a 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -18,8 +18,10 @@
package org.apache.tez.engine.newapi;
+import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
/**
@@ -28,8 +30,18 @@ import org.apache.tez.common.counters.TezCounters;
*/
public interface TezTaskContext {
+ // TODO NEWTEZ
+ // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+ // on the ApplicationMaster when a thundering herd of reducers fetch events
+ // This should not be necessary after HADOOP-8942
/**
+ * Get the {@link ApplicationId} for the running app
+ * @return the {@link ApplicationId}
+ */
+ public ApplicationId getApplicationId();
+
+ /**
* Get the index of this Task
* @return Task Index
*/
@@ -42,12 +54,17 @@ public interface TezTaskContext {
public int getAttemptNumber();
/**
+ * Get the name of the DAG
+ * @return the DAG name
+ */
+ public String getDAGName();
+
+ /**
* Get the name of the Vertex in which the task is running
* @return Vertex Name
*/
public String getTaskVertexName();
-
public TezCounters getCounters();
/**
@@ -62,4 +79,46 @@ public interface TezTaskContext {
*/
public byte[] getUserPayload();
+ /**
+ * Get the work diectories for the Input/Output/Processor
+ * @return an array of work dirs
+ */
+ public String[] getWorkDirs();
+
+ /**
+ * Returns an identifier which is unique to the specific Input, Processor or
+ * Output
+ *
+ * @return
+ */
+ public String getUniqueIdentifier();
+
+ /**
+ * Report a fatal error to the framework. This will cause the entire task to
+ * fail and should not be used for reporting temporary or recoverable errors
+ *
+ * @param exception an exception representing the error
+ */
+ public void fatalError(Throwable exception, String message);
+
+ /**
+ * Returns meta-data for the specified service. As an example, when the MR
+ * ShuffleHandler is used - this would return the jobToken serialized as bytes
+ *
+ * @param serviceName
+ * the name of the service for which meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+ /**
+ * Return Provider meta-data for the specified service As an example, when the
+ * MR ShuffleHandler is used - this would return the shuffle port serialized
+ * as bytes
+ *
+ * @param serviceName
+ * the name of the service for which provider meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ public ByteBuffer getServiceProviderMetaData(String serviceName);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
new file mode 100644
index 0000000..cbef463
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition
+ * output key/value pairs.
+ */
+public interface Partitioner {
+
+ /**
+ * Get partition for given key/value.
+ * @param key key
+ * @param value value
+ * @param numPartitions number of partitions
+ * @return
+ */
+ int getPartition(Object key, Object value, int numPartitions);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
new file mode 100644
index 0000000..b3287c9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+public class TezEngineUtils {
+
+ public static String getTaskIdentifier(String vertexName, int taskIndex) {
+ return String.format("%s_%06d", vertexName, taskIndex);
+ }
+
+ public static String getTaskAttemptIdentifier(int taskIndex,
+ int taskAttemptNumber) {
+ return String.format("%d_%d", taskIndex, taskAttemptNumber);
+ }
+
+ // TODO Maybe include a dag name in this.
+ public static String getTaskAttemptIdentifier(String vertexName,
+ int taskIndex, int taskAttemptNumber) {
+ return String.format("%s_%06d_%02d", vertexName, taskIndex,
+ taskAttemptNumber);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
new file mode 100644
index 0000000..a33d00b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * This class is not thread safe. Accessing methods from multiple threads will
+ * lead to corrupt data.
+ *
+ */
+public class ValuesIterator<KEY,VALUE> {
+ protected TezRawKeyValueIterator in; //input iterator
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
+ //private boolean hasNext; // more w/ this key
+ private boolean more; // more in file
+ private RawComparator<KEY> comparator;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
+ private DataInputBuffer keyIn = new DataInputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
+ private TezCounter inputKeyCounter;
+ private TezCounter inputValueCounter;
+
+ private int keyCtr = 0;
+ private boolean hasMoreValues; // For the current key.
+ private boolean isFirstRecord = true;
+
+ public ValuesIterator (TezRawKeyValueIterator in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ TezCounter inputKeyCounter,
+ TezCounter inputValueCounter)
+ throws IOException {
+ this.in = in;
+ this.comparator = comparator;
+ this.inputKeyCounter = inputKeyCounter;
+ this.inputValueCounter = inputValueCounter;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(this.valueIn);
+ }
+
+ TezRawKeyValueIterator getRawIterator() { return in; }
+
+ /**
+ * Move to the next K-Vs pair
+ * @return true if another pair exists, otherwise false.
+ * @throws IOException
+ */
+ public boolean moveToNext() throws IOException {
+ if (isFirstRecord) {
+ readNextKey();
+ key = nextKey;
+ nextKey = null;
+ hasMoreValues = more;
+ isFirstRecord = false;
+ } else {
+ nextKey();
+ }
+ return more;
+ }
+
+ /** The current key. */
+ public KEY getKey() {
+ return key;
+ }
+
+ public Iterable<VALUE> getValues() {
+ return new Iterable<VALUE>() {
+
+ @Override
+ public Iterator<VALUE> iterator() {
+
+ return new Iterator<VALUE>() {
+
+ private final int keyNumber = keyCtr;
+
+ @Override
+ public boolean hasNext() {
+ return hasMoreValues;
+ }
+
+ @Override
+ public VALUE next() {
+ if (!hasMoreValues) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ Preconditions
+ .checkState(
+ keyNumber == keyCtr,
+ "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
+
+ try {
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
+ }
+ inputValueCounter.increment(1);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot remove elements");
+ }
+ };
+ }
+ };
+ }
+
+
+
+ /** Start processing next unique key. */
+ private void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasMoreValues) {
+ readNextKey();
+ }
+ if (more) {
+ inputKeyCounter.increment(1);
+ ++keyCtr;
+ }
+
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
+ hasMoreValues = more;
+ }
+
+ /**
+ * read the next key - which may be the same as the current key.
+ */
+ private void readNextKey() throws IOException {
+ more = in.next();
+ if (more) {
+ DataInputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasMoreValues = false;
+ }
+ }
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ DataInputBuffer nextValueBytes = in.getValue();
+ valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+ value = valDeserializer.deserialize(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
index e19e2c8..38b04d3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
@@ -29,25 +29,24 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.sort.impl.TezMerger;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.TezInputContext;
@SuppressWarnings({"rawtypes"})
public class LocalShuffle {
- private final TezEngineTaskContext taskContext;
- private final RunningTaskContext runningTaskContext;
+ // TODO NEWTEZ This is broken.
+
+ private final TezInputContext inputContext;
private final Configuration conf;
- private final int tasksInDegree;
+ private final int numInputs;
private final Class keyClass;
private final Class valClass;
@@ -60,18 +59,15 @@ public class LocalShuffle {
private final CompressionCodec codec;
private final TezTaskOutput mapOutputFile;
- public LocalShuffle(TezEngineTaskContext taskContext,
- RunningTaskContext runningTaskContext,
- Configuration conf,
- TezTaskReporter reporter
- ) throws IOException {
- this.taskContext = taskContext;
- this.runningTaskContext = runningTaskContext;
+ public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+ this.inputContext = inputContext;
this.conf = conf;
+ this.numInputs = numInputs;
+
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-
+
this.sortFactor =
conf.getInt(
TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
@@ -79,10 +75,9 @@ public class LocalShuffle {
this.rfs = FileSystem.getLocal(conf).getRaw();
- this.spilledRecordsCounter =
- reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+ this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
- // compression
+ // compression
if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -90,19 +85,16 @@ public class LocalShuffle {
} else {
this.codec = null;
}
-
- this.tasksInDegree = taskContext.getInputSpecList().get(0).getNumInputs();
-
+
// Always local
- this.mapOutputFile = new TezLocalTaskOutputFiles();
- this.mapOutputFile.setConf(conf);
-
+ this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
}
+
public TezRawKeyValueIterator run() throws IOException {
// Copy is complete, obviously!
- this.runningTaskContext.getProgress().addPhase("copy").complete();
+
// Merge
return TezMerger.merge(conf, rfs,
keyClass, valClass,
@@ -110,17 +102,17 @@ public class LocalShuffle {
getMapFiles(),
false,
sortFactor,
- new Path(taskContext.getTaskAttemptId().toString()),
+ new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken
comparator,
- runningTaskContext.getTaskReporter(), spilledRecordsCounter, null, null);
+ null, spilledRecordsCounter, null, null);
}
private Path[] getMapFiles()
throws IOException {
List<Path> fileList = new ArrayList<Path>();
// for local jobs
- for(int i = 0; i < tasksInDegree; ++i) {
- fileList.add(mapOutputFile.getInputFile(i));
+ for(int i = 0; i < numInputs; ++i) {
+ //fileList.add(mapOutputFile.getInputFile(i));
}
return fileList.toArray(new Path[0]);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
deleted file mode 100644
index a3ac968..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class DelegationTokenRenewal {
- private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
- public static final String SCHEME = "hdfs";
-
- /**
- * class that is used for keeping tracks of DT to renew
- *
- */
- private static class DelegationTokenToRenew {
- public final Token<?> token;
- public final ApplicationId jobId;
- public final Configuration conf;
- public long expirationDate;
- public TimerTask timerTask;
-
- public DelegationTokenToRenew(
- ApplicationId jId, Token<?> t,
- Configuration newConf, long newExpirationDate) {
- token = t;
- jobId = jId;
- conf = newConf;
- expirationDate = newExpirationDate;
- timerTask = null;
- if(token==null || jobId==null || conf==null) {
- throw new IllegalArgumentException("invalid params for Renew Token" +
- ";t="+token+";j="+jobId+";c="+conf);
- }
- }
- public void setTimerTask(TimerTask tTask) {
- timerTask = tTask;
- }
- @Override
- public String toString() {
- return token + ";exp="+expirationDate;
- }
- @Override
- public boolean equals (Object obj) {
- if (obj == this) {
- return true;
- } else if (obj == null || getClass() != obj.getClass()) {
- return false;
- } else {
- return token.equals(((DelegationTokenToRenew)obj).token);
- }
- }
- @Override
- public int hashCode() {
- return token.hashCode();
- }
- }
-
- // global single timer (daemon)
- private static Timer renewalTimer = new Timer(true);
-
- //delegation token canceler thread
- private static DelegationTokenCancelThread dtCancelThread =
- new DelegationTokenCancelThread();
- static {
- dtCancelThread.start();
- }
-
-
- //managing the list of tokens using Map
- // jobId=>List<tokens>
- private static Set<DelegationTokenToRenew> delegationTokens =
- Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
-
- private static class DelegationTokenCancelThread extends Thread {
- private static class TokenWithConf {
- Token<?> token;
- Configuration conf;
- TokenWithConf(Token<?> token, Configuration conf) {
- this.token = token;
- this.conf = conf;
- }
- }
- private LinkedBlockingQueue<TokenWithConf> queue =
- new LinkedBlockingQueue<TokenWithConf>();
-
- public DelegationTokenCancelThread() {
- super("Delegation Token Canceler");
- setDaemon(true);
- }
- public void cancelToken(Token<?> token,
- Configuration conf) {
- TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
- while (!queue.offer(tokenWithConf)) {
- LOG.warn("Unable to add token " + token + " for cancellation. " +
- "Will retry..");
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void run() {
- while (true) {
- TokenWithConf tokenWithConf = null;
- try {
- tokenWithConf = queue.take();
- final TokenWithConf current = tokenWithConf;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Canceling token " + tokenWithConf.token.getService());
- }
- // need to use doAs so that http can find the kerberos tgt
- UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<Void>() {
-
- @Override
- public Void run() throws Exception {
- current.token.cancel(current.conf);
- return null;
- }
- });
- } catch (IOException e) {
- LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
- StringUtils.stringifyException(e));
- } catch (InterruptedException ie) {
- return;
- } catch (Throwable t) {
- LOG.warn("Got exception " + StringUtils.stringifyException(t) +
- ". Exiting..");
- System.exit(-1);
- }
- }
- }
- }
- //adding token
- private static void addTokenToList(DelegationTokenToRenew t) {
- delegationTokens.add(t);
- }
-
- public static synchronized void registerDelegationTokensForRenewal(
- ApplicationId jobId, Credentials ts, Configuration conf) throws IOException {
- if(ts==null)
- return; //nothing to add
-
- Collection <Token<?>> tokens = ts.getAllTokens();
- long now = System.currentTimeMillis();
-
- for (Token<?> t : tokens) {
- // first renew happens immediately
- if (t.isManaged()) {
- DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
- now);
-
- addTokenToList(dtr);
-
- setTimerForTokenRenewal(dtr, true);
- LOG.info("registering token for renewal for service =" + t.getService()
- + " and jobID = " + jobId);
- }
- }
- }
-
- /**
- * Task - to renew a token
- *
- */
- private static class RenewalTimerTask extends TimerTask {
- private DelegationTokenToRenew dttr;
-
- RenewalTimerTask(DelegationTokenToRenew t) { dttr = t; }
-
- @Override
- public void run() {
- Token<?> token = dttr.token;
- long newExpirationDate=0;
- try {
- // need to use doAs so that http can find the kerberos tgt
- dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<Long>() {
-
- @Override
- public Long run() throws Exception {
- return dttr.token.renew(dttr.conf);
- }
- });
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("renewing for:" + token.getService() + ";newED="
- + dttr.expirationDate);
- }
- setTimerForTokenRenewal(dttr, false);// set the next one
- } catch (Exception e) {
- LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
- removeFailedDelegationToken(dttr);
- }
- }
- }
-
- /**
- * find the soonest expiring token and set it for renew
- */
- private static void setTimerForTokenRenewal(
- DelegationTokenToRenew token, boolean firstTime) {
-
- // calculate timer time
- long now = System.currentTimeMillis();
- long renewIn;
- if(firstTime) {
- renewIn = now;
- } else {
- long expiresIn = (token.expirationDate - now);
- renewIn = now + expiresIn - expiresIn/10; // little before expiration
- }
-
- // need to create new timer every time
- TimerTask tTask = new RenewalTimerTask(token);
- token.setTimerTask(tTask); // keep reference to the timer
-
- renewalTimer.schedule(token.timerTask, new Date(renewIn));
- }
-
- /**
- * removing all tokens renewals
- */
- static public void close() {
- renewalTimer.cancel();
- delegationTokens.clear();
- }
-
- // cancel a token
- private static void cancelToken(DelegationTokenToRenew t) {
- dtCancelThread.cancelToken(t.token, t.conf);
- }
-
- /**
- * removing failed DT
- * @param jobId
- */
- private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
- ApplicationId jobId = t.jobId;
- if (LOG.isDebugEnabled())
- LOG.debug("removing failed delegation token for jobid=" + jobId +
- ";t=" + t.token.getService());
- delegationTokens.remove(t);
- // cancel the timer
- if(t.timerTask!=null)
- t.timerTask.cancel();
- }
-
- /**
- * removing DT for completed jobs
- * @param jobId
- */
- public static void removeDelegationTokenRenewalForJob(ApplicationId jobId) {
- synchronized (delegationTokens) {
- Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
- while(it.hasNext()) {
- DelegationTokenToRenew dttr = it.next();
- if (dttr.jobId.equals(jobId)) {
- if (LOG.isDebugEnabled())
- LOG.debug("removing delegation token for jobid=" + jobId +
- ";t=" + dttr.token.getService());
-
- // cancel the timer
- if(dttr.timerTask!=null)
- dttr.timerTask.cancel();
-
- // cancel the token
- cancelToken(dttr);
-
- it.remove();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
deleted file mode 100644
index 51e05af..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-class EventFetcher extends Thread {
- private static final long SLEEP_TIME = 1000;
- private static final int MAX_RETRIES = 10;
- private static final int RETRY_PERIOD = 5000;
- private static final Log LOG = LogFactory.getLog(EventFetcher.class);
-
- private final TezTaskAttemptID reduce;
- private final Master umbilical;
- private final ShuffleScheduler scheduler;
- private int fromEventIdx = 0;
- private int maxEventsToFetch;
- private Shuffle shuffle = null;
-
- private int maxMapRuntime = 0;
-
- private volatile boolean stopped = false;
-
- public EventFetcher(TezTaskAttemptID reduce,
- Master umbilical,
- ShuffleScheduler scheduler,
- Shuffle shuffle,
- int maxEventsToFetch) {
- setName("EventFetcher for fetching Map Completion Events");
- setDaemon(true);
- this.reduce = reduce;
- this.umbilical = umbilical;
- this.scheduler = scheduler;
- this.shuffle = shuffle;
- this.maxEventsToFetch = maxEventsToFetch;
- }
-
- @Override
- public void run() {
- int failures = 0;
- LOG.info(reduce + " Thread started: " + getName());
-
- try {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- try {
- int numNewMaps = getMapCompletionEvents();
- failures = 0;
- if (numNewMaps > 0) {
- LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
- }
- LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
- if (!Thread.currentThread().isInterrupted()) {
- Thread.sleep(SLEEP_TIME);
- }
- } catch (InterruptedException e) {
- LOG.info("EventFetcher is interrupted.. Returning");
- return;
- } catch (IOException ie) {
- LOG.info("Exception in getting events", ie);
- // check to see whether to abort
- if (++failures >= MAX_RETRIES) {
- throw new IOException("too many failures downloading events", ie);
- }
- // sleep for a bit
- if (!Thread.currentThread().isInterrupted()) {
- Thread.sleep(RETRY_PERIOD);
- }
- }
- }
- } catch (InterruptedException e) {
- return;
- } catch (Throwable t) {
- shuffle.reportException(t);
- return;
- }
- }
-
- public void shutDown() {
- this.stopped = true;
- interrupt();
- try {
- join(5000);
- } catch(InterruptedException ie) {
- LOG.warn("Got interrupted while joining " + getName(), ie);
- }
- }
-
- /**
- * Queries the {@link TaskTracker} for a set of map-completion events
- * from a given event ID.
- * @throws IOException
- */
- protected int getMapCompletionEvents() throws IOException {
-
- int numNewMaps = 0;
- TezDependentTaskCompletionEvent events[] = null;
-
- do {
- TezTaskDependencyCompletionEventsUpdate update =
- umbilical.getDependentTasksCompletionEvents(
- fromEventIdx,
- maxEventsToFetch,
- reduce);
- events = update.getDependentTaskCompletionEvents();
- LOG.debug("Got " + events.length + " map completion events from " +
- fromEventIdx);
- // Check if the reset is required.
- // Since there is no ordering of the task completion events at the
- // reducer, the only option to sync with the new jobtracker is to reset
- // the events index
- if (update.shouldReset()) {
- fromEventIdx = 0;
- scheduler.resetKnownMaps();
- }
-
- // Update the last seen event ID
- fromEventIdx += events.length;
-
- // Process the TaskCompletionEvents:
- // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
- // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
- // fetching from those maps.
- // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
- // outputs at all.
- for (TezDependentTaskCompletionEvent event : events) {
- byte[] userPayload = event.getUserPayload();
- if(userPayload != null) {
- shuffle.updateUserPayload(userPayload);
- }
- switch (event.getStatus()) {
- case SUCCEEDED:
- addMapHosts(event);
- numNewMaps ++;
- int duration = event.getTaskRunTime();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- scheduler.informMaxMapRunTime(maxMapRuntime);
- }
- break;
- case FAILED:
- case KILLED:
- case OBSOLETE:
- scheduler.obsoleteMapOutput(event.getTaskAttemptID());
- LOG.info("Ignoring obsolete output of " + event.getStatus() +
- " map-task: '" + event.getTaskAttemptID() + "'");
- break;
- case TIPFAILED:
- scheduler.tipFailed(event.getTaskAttemptID().getTaskID());
- LOG.info("Ignoring output of failed map TIP: '" +
- event.getTaskAttemptID() + "'");
- break;
- }
- }
- } while (events.length == maxEventsToFetch);
-
- return numNewMaps;
- }
-
- private void addMapHosts(TezDependentTaskCompletionEvent event) {
- int reduceRange = shuffle.getReduceRange();
- for(int i=0; i<reduceRange; ++i) {
- int partitionId = reduce.getTaskID().getId()+i;
- URI u = getBaseURI(event.getTaskTrackerHttp(), partitionId);
- scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
- partitionId,
- u.toString(),
- event.getTaskAttemptID());
- }
- }
-
- private URI getBaseURI(String url, int reduceId) {
- StringBuffer baseUrl = new StringBuffer(url);
- if (!url.endsWith("/")) {
- baseUrl.append("/");
- }
- baseUrl.append("mapOutput?job=");
- // TODO TEZ HACK to get shuffle working. ApplicationId vs JobId shuffle handler.
- // FIXME dag or application or ???
- String jobID = reduce.getTaskID().getVertexID().getDAGId().
- getApplicationId().toString().replace("application", "job");
-
- baseUrl.append(jobID);
- baseUrl.append("&reduce=");
- baseUrl.append(reduceId);
- baseUrl.append("&map=");
- URI u = URI.create(baseUrl.toString());
- return u;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 0acceaf..86e5b56 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -43,17 +43,16 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.IDUtils;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.security.SecureShuffleUtils;
import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.common.sort.impl.IFileInputStream;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -63,8 +62,7 @@ class Fetcher extends Thread {
/** Basic/unit connection timeout (in milliseconds) */
private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
- private final Progressable reporter;
+
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
@@ -99,27 +97,28 @@ class Fetcher extends Thread {
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
- TezTaskReporter reporter, ShuffleClientMetrics metrics,
- Shuffle shuffle, SecretKey jobTokenSecret) {
+ ShuffleClientMetrics metrics,
+ Shuffle shuffle, TezInputContext inputContext) throws IOException {
this.job = job;
- this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
this.metrics = metrics;
this.shuffle = shuffle;
this.id = ++nextId;
- this.jobTokenSecret = jobTokenSecret;
- ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ this.jobTokenSecret = ShuffleUtils
+ .getJobTokenSecretFromTokenBytes(inputContext
+ .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+ ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
- wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_LENGTH.toString());
- badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.BAD_ID.toString());
- wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_MAP.toString());
- connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.CONNECTION.toString());
- wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+ wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
if (ConfigUtils.isIntermediateInputCompressed(job)) {
@@ -156,6 +155,7 @@ class Fetcher extends Thread {
}
}
}
+
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
@@ -221,28 +221,28 @@ class Fetcher extends Thread {
@VisibleForTesting
protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
- List<TezTaskAttemptID> maps = scheduler.getMapsForHost(host);
+ List<TaskAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
- if (maps.size() == 0) {
+ if (srcAttempts.size() == 0) {
return;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
- + maps);
+ + srcAttempts);
}
// List of maps to be fetched yet
- Set<TezTaskAttemptID> remaining = new HashSet<TezTaskAttemptID>(maps);
+ Set<TaskAttemptIdentifier> remaining = new HashSet<TaskAttemptIdentifier>(srcAttempts);
// Construct the url and connect
DataInputStream input;
boolean connectSucceeded = false;
try {
- URL url = getMapOutputURL(host, maps);
+ URL url = getMapOutputURL(host, srcAttempts);
HttpURLConnection connection = openConnection(url);
// generate hash of the url
@@ -294,19 +294,19 @@ class Fetcher extends Thread {
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
if (!connectSucceeded) {
- for(TezTaskAttemptID left: remaining) {
+ for(TaskAttemptIdentifier left: remaining) {
scheduler.copyFailed(left, host, connectSucceeded);
}
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
- TezTaskAttemptID firstMap = maps.get(0);
+ TaskAttemptIdentifier firstMap = srcAttempts.get(0);
scheduler.copyFailed(firstMap, host, connectSucceeded);
}
// Add back all the remaining maps, WITHOUT marking them as failed
- for(TezTaskAttemptID left: remaining) {
+ for(TaskAttemptIdentifier left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
@@ -318,14 +318,14 @@ class Fetcher extends Thread {
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
- TezTaskAttemptID[] failedTasks = null;
+ TaskAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
failedTasks = copyMapOutput(host, input, remaining);
}
if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
- for(TezTaskAttemptID left: failedTasks) {
+ for(TaskAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true);
}
}
@@ -338,19 +338,19 @@ class Fetcher extends Thread {
+ remaining.size() + " left.");
}
} finally {
- for (TezTaskAttemptID left : remaining) {
+ for (TaskAttemptIdentifier left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
}
}
- private static TezTaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TezTaskAttemptID[0];
+ private static TaskAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptIdentifier[0];
- private TezTaskAttemptID[] copyMapOutput(MapHost host,
+ private TaskAttemptIdentifier[] copyMapOutput(MapHost host,
DataInputStream input,
- Set<TezTaskAttemptID> remaining) {
+ Set<TaskAttemptIdentifier> remaining) {
MapOutput mapOutput = null;
- TezTaskAttemptID mapId = null;
+ TaskAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
long compressedLength = -1;
@@ -361,7 +361,8 @@ class Fetcher extends Thread {
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
- mapId = IDUtils.toTaskAttemptId(header.mapId);
+ String pathComponent = header.mapId;
+ srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
@@ -369,23 +370,23 @@ class Fetcher extends Thread {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+ return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
- remaining, mapId)) {
- return new TezTaskAttemptID[] {mapId};
+ remaining, srcAttemptId)) {
+ return new TaskAttemptIdentifier[] {srcAttemptId};
}
if(LOG.isDebugEnabled()) {
- LOG.debug("header: " + mapId + ", len: " + compressedLength +
+ LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
// Get the location for the map output - either in-memory or on-disk
- mapOutput = merger.reserve(mapId, decompressedLength, id);
+ mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
@@ -396,7 +397,7 @@ class Fetcher extends Thread {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map " +
- mapOutput.getMapId() + " decomp: " +
+ mapOutput.getAttemptIdentifier() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
mapOutput.getType());
if (mapOutput.getType() == Type.MEMORY) {
@@ -408,32 +409,32 @@ class Fetcher extends Thread {
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
- scheduler.copySucceeded(mapId, host, compressedLength,
+ scheduler.copySucceeded(srcAttemptId, host, compressedLength,
endTime - startTime, mapOutput);
// Note successful shuffle
- remaining.remove(mapId);
+ remaining.remove(srcAttemptId);
metrics.successFetch();
return null;
} catch (IOException ioe) {
ioErrs.increment(1);
- if (mapId == null || mapOutput == null) {
+ if (srcAttemptId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
- mapId + " decomp: " +
+ srcAttemptId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
- if(mapId == null) {
- return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+ if(srcAttemptId == null) {
+ return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
} else {
- return new TezTaskAttemptID[] {mapId};
+ return new TaskAttemptIdentifier[] {srcAttemptId};
}
}
- LOG.warn("Failed to shuffle output of " + mapId +
+ LOG.warn("Failed to shuffle output of " + srcAttemptId +
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
mapOutput.abort();
metrics.failedFetch();
- return new TezTaskAttemptID[] {mapId};
+ return new TaskAttemptIdentifier[] {srcAttemptId};
}
}
@@ -448,11 +449,11 @@ class Fetcher extends Thread {
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
- int forReduce, Set<TezTaskAttemptID> remaining, TezTaskAttemptID mapId) {
+ int forReduce, Set<TaskAttemptIdentifier> remaining, TaskAttemptIdentifier srcAttemptId) {
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(getName() + " invalid lengths in map output header: id: " +
- mapId + " len: " + compressedLength + ", decomp len: " +
+ srcAttemptId + " len: " + compressedLength + ", decomp len: " +
decompressedLength);
return false;
}
@@ -462,15 +463,15 @@ class Fetcher extends Thread {
if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
wrongReduceErrs.increment(1);
LOG.warn(getName() + " data for the wrong reduce map: " +
- mapId + " len: " + compressedLength + " decomp len: " +
+ srcAttemptId + " len: " + compressedLength + " decomp len: " +
decompressedLength + " for reduce " + forReduce);
return false;
}
// Sanity check
- if (!remaining.contains(mapId)) {
+ if (!remaining.contains(srcAttemptId)) {
wrongMapErrs.increment(1);
- LOG.warn("Invalid map-output! Received output for " + mapId);
+ LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
return false;
}
@@ -485,17 +486,17 @@ class Fetcher extends Thread {
* @return
* @throws MalformedURLException
*/
- private URL getMapOutputURL(MapHost host, List<TezTaskAttemptID> maps
+ private URL getMapOutputURL(MapHost host, List<TaskAttemptIdentifier> srcAttempts
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
- for (TezTaskAttemptID mapId : maps) {
+ for (TaskAttemptIdentifier mapId : srcAttempts) {
if (!first) {
url.append(",");
}
- url.append(mapId);
+ url.append(mapId.getPathComponent());
first = false;
}
@@ -566,9 +567,8 @@ class Fetcher extends Thread {
try {
IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
metrics.inputBytes(shuffleData.length);
- reporter.progress();
LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
- mapOutput.getMapId());
+ mapOutput.getAttemptIdentifier());
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
@@ -593,17 +593,16 @@ class Fetcher extends Thread {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
- mapOutput.getMapId());
+ mapOutput.getAttemptIdentifier());
}
output.write(buf, 0, n);
bytesLeft -= n;
metrics.inputBytes(n);
- reporter.progress();
}
LOG.info("Read " + (compressedLength - bytesLeft) +
" bytes from map-output for " +
- mapOutput.getMapId());
+ mapOutput.getAttemptIdentifier());
output.close();
} catch (IOException ioe) {
@@ -617,7 +616,7 @@ class Fetcher extends Thread {
// Sanity check
if (bytesLeft != 0) {
throw new IOException("Incomplete map output received for " +
- mapOutput.getMapId() + " from " +
+ mapOutput.getAttemptIdentifier() + " from " +
host.getHostName() + " (" +
bytesLeft + " bytes missing of " +
compressedLength + ")"
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
index 7cea558..d10ebaa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
@@ -25,8 +25,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.sort.impl.IFile;
import org.apache.tez.engine.common.sort.impl.IFile.Reader;
@@ -36,14 +34,14 @@ import org.apache.tez.engine.common.sort.impl.IFile.Reader;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InMemoryReader extends Reader {
- private final TezTaskAttemptID taskAttemptId;
+ private final TaskAttemptIdentifier taskAttemptId;
private final MergeManager merger;
DataInputBuffer memDataIn = new DataInputBuffer();
private int start;
private int length;
private int prevKeyPos;
- public InMemoryReader(MergeManager merger, TezTaskAttemptID taskAttemptId,
+ public InMemoryReader(MergeManager merger, TaskAttemptIdentifier taskAttemptId,
byte[] data, int start, int length)
throws IOException {
super(null, null, length - start, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
index 24f7635..cd644de 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
@@ -20,9 +20,6 @@ package org.apache.tez.engine.common.shuffle.impl;
import java.util.ArrayList;
import java.util.List;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
class MapHost {
public static enum State {
@@ -37,7 +34,8 @@ class MapHost {
private final int partitionId;
private final String baseUrl;
private final String identifier;
- private List<TezTaskAttemptID> maps = new ArrayList<TezTaskAttemptID>();
+ // Tracks attempt IDs
+ private List<TaskAttemptIdentifier> maps = new ArrayList<TaskAttemptIdentifier>();
public MapHost(int partitionId, String hostName, String baseUrl) {
this.partitionId = partitionId;
@@ -70,16 +68,16 @@ class MapHost {
return baseUrl;
}
- public synchronized void addKnownMap(TezTaskAttemptID mapId) {
- maps.add(mapId);
+ public synchronized void addKnownMap(TaskAttemptIdentifier srcAttempt) {
+ maps.add(srcAttempt);
if (state == State.IDLE) {
state = State.PENDING;
}
}
-
- public synchronized List<TezTaskAttemptID> getAndClearKnownMaps() {
- List<TezTaskAttemptID> currentKnownMaps = maps;
- maps = new ArrayList<TezTaskAttemptID>();
+
+ public synchronized List<TaskAttemptIdentifier> getAndClearKnownMaps() {
+ List<TaskAttemptIdentifier> currentKnownMaps = maps;
+ maps = new ArrayList<TaskAttemptIdentifier>();
return currentKnownMaps;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index 272709e..f0b48fd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+
class MapOutput {
private static final Log LOG = LogFactory.getLog(MapOutput.class);
@@ -42,10 +42,10 @@ class MapOutput {
DISK
}
+ private TaskAttemptIdentifier attemptIdentifier;
private final int id;
private final MergeManager merger;
- private final TezTaskAttemptID mapId;
private final long size;
@@ -61,13 +61,13 @@ class MapOutput {
private final boolean primaryMapOutput;
- MapOutput(TezTaskAttemptID mapId, MergeManager merger, long size,
+ MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, long size,
Configuration conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput,
TezTaskOutputFiles mapOutputFile)
throws IOException {
this.id = ID.incrementAndGet();
- this.mapId = mapId;
+ this.attemptIdentifier = attemptIdentifier;
this.merger = merger;
type = Type.DISK;
@@ -79,7 +79,7 @@ class MapOutput {
this.localFS = FileSystem.getLocal(conf);
outputPath =
- mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+ mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getTaskIndex(), size);
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
disk = localFS.create(tmpOutputPath);
@@ -87,10 +87,10 @@ class MapOutput {
this.primaryMapOutput = primaryMapOutput;
}
- MapOutput(TezTaskAttemptID mapId, MergeManager merger, int size,
+ MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, int size,
boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
- this.mapId = mapId;
+ this.attemptIdentifier = attemptIdentifier;
this.merger = merger;
type = Type.MEMORY;
@@ -107,10 +107,10 @@ class MapOutput {
this.primaryMapOutput = primaryMapOutput;
}
- public MapOutput(TezTaskAttemptID mapId) {
+ public MapOutput(TaskAttemptIdentifier attemptIdentifier) {
this.id = ID.incrementAndGet();
- this.mapId = mapId;
-
+ this.attemptIdentifier = attemptIdentifier;
+
type = Type.WAIT;
merger = null;
memory = null;
@@ -159,8 +159,8 @@ class MapOutput {
return disk;
}
- public TezTaskAttemptID getMapId() {
- return mapId;
+ public TaskAttemptIdentifier getAttemptIdentifier() {
+ return this.attemptIdentifier;
}
public Type getType() {
@@ -198,7 +198,7 @@ class MapOutput {
}
public String toString() {
- return "MapOutput(" + mapId + ", " + type + ")";
+ return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
}
public static class MapOutputComparator