You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:50 UTC
[12/43] tez git commit: TEZ-776. Reduce AM mem usage caused by
storing TezEvents (bikas)
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 2ea0299..d177460 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -33,9 +33,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
@@ -160,12 +160,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
super(context);
}
- public static class CustomShuffleEdgeManager extends EdgeManagerPlugin {
+ public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand {
int numSourceTaskOutputs;
int numDestinationTasks;
int basePartitionRange;
int remainderRangeForLastShuffler;
int numSourceTasks;
+
+ int[][] sourceIndices;
+ int[][] targetIndices;
public CustomShuffleEdgeManager(EdgeManagerPluginContext context) {
super(context);
@@ -231,8 +234,106 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
destinationTaskAndInputIndices.put(destinationTaskIndex, Collections.singletonList(targetIndex));
}
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destTaskIndex) throws Exception {
+ int sourceIndex = sourceOutputIndex;
+ int destinationTaskIndex = sourceIndex/basePartitionRange;
+ if (destinationTaskIndex != destTaskIndex) {
+ return null;
+ }
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+
+ // all inputs from a source task are next to each other in original order
+ int targetIndex =
+ sourceTaskIndex * partitionRange
+ + sourceIndex % partitionRange;
+ return EventRouteMetadata.create(1, new int[]{targetIndex});
+ }
+
+ private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
+ int startIndex = taskIndex * offSetPerTask;
+ int[] indices = new int[partitionRange];
+ for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+ indices[currentIndex] = (startIndex + currentIndex);
+ }
+ return indices;
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ // target indices derive from num src tasks
+ int numSourceTasks = getContext().getSourceVertexNumTasks();
+ targetIndices = new int[numSourceTasks][];
+ for (int srcTaskIndex=0; srcTaskIndex<numSourceTasks; ++srcTaskIndex) {
+ targetIndices[srcTaskIndex] = createIndices(basePartitionRange, srcTaskIndex,
+ basePartitionRange);
+ }
+
+ // source indices derive from num dest tasks (==partitions)
+ int numTargetTasks = getContext().getDestinationVertexNumTasks();
+ sourceIndices = new int[numTargetTasks][];
+ for (int destTaskIndex=0; destTaskIndex<numTargetTasks; ++destTaskIndex) {
+ int partitionRange = basePartitionRange;
+ if (destTaskIndex == (numTargetTasks-1)) {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+ // skip the basePartitionRange per destination task
+ sourceIndices[destTaskIndex] = createIndices(partitionRange, destTaskIndex,
+ basePartitionRange);
+ }
+ }
+
+ private int[] createTargetIndicesForRemainder(int srcTaskIndex) {
+ // for the last task just generate on the fly instead of doubling the memory
+ return createIndices(remainderRangeForLastShuffler, srcTaskIndex,
+ remainderRangeForLastShuffler);
+ }
@Override
+ public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ int[] targetIndicesToSend;
+ int partitionRange;
+ if(destinationTaskIndex == (numDestinationTasks-1)) {
+ if (remainderRangeForLastShuffler != basePartitionRange) {
+ targetIndicesToSend = createTargetIndicesForRemainder(sourceTaskIndex);
+ } else {
+ targetIndicesToSend = targetIndices[sourceTaskIndex];
+ }
+ partitionRange = remainderRangeForLastShuffler;
+ } else {
+ targetIndicesToSend = targetIndices[sourceTaskIndex];
+ partitionRange = basePartitionRange;
+ }
+
+ return EventRouteMetadata.create(partitionRange, targetIndicesToSend,
+ sourceIndices[destinationTaskIndex]);
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ int partitionRange = basePartitionRange;
+ if (destinationTaskIndex == (numDestinationTasks-1)) {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+ int startOffset = sourceTaskIndex * partitionRange;
+ int[] targetIndices = new int[partitionRange];
+ for (int i=0; i<partitionRange; ++i) {
+ targetIndices[i] = (startOffset + i);
+ }
+ return EventRouteMetadata.create(partitionRange, targetIndices);
+ }
+
+ @Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
if (remainderRangeForLastShuffler < basePartitionRange) {
@@ -281,6 +382,18 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
}
@Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex,
+ int destinationFailedInputIndex) {
+ int partitionRange = 1;
+ if(destinationTaskIndex < numDestinationTasks-1) {
+ partitionRange = basePartitionRange;
+ } else {
+ partitionRange = remainderRangeForLastShuffler;
+ }
+ return destinationFailedInputIndex/partitionRange;
+ }
+
+ @Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return numDestinationTasks;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 70be21b..7ba6028 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -308,7 +308,7 @@ public class TestExceptionPropagation {
// EdgeManager
EM_Initialize, EM_GetNumDestinationTaskPhysicalInputs, EM_GetNumSourceTaskPhysicalOutputs,
EM_RouteDataMovementEventToDestination, EM_GetNumDestinationConsumerTasks,
- EM_RouteInputErrorEventToSource,
+ EM_RouteInputErrorEventToSource, EM_PrepareForRouting,
// Not Supported yet
// EM_RouteInputSourceTaskFailedEventToDestination,
@@ -814,6 +814,33 @@ public class TestExceptionPropagation {
super.routeDataMovementEventToDestination(event, sourceTaskIndex,
sourceOutputIndex, destinationTaskAndInputIndices);
}
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ if (exLocation == ExceptionLocation.EM_PrepareForRouting) {
+ throw new RuntimeException(exLocation.name());
+ }
+ super.prepareForRouting();
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+ if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+ throw new RuntimeException(exLocation.name());
+ }
+ return super.routeDataMovementEventToDestination(sourceTaskIndex, sourceOutputIndex, destinationTaskIndex);
+ }
+
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+ throw new RuntimeException(exLocation.name());
+ }
+ return super.routeCompositeDataMovementEventToDestination(sourceTaskIndex, destinationTaskIndex);
+ }
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
@@ -826,6 +853,16 @@ public class TestExceptionPropagation {
}
@Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex,
+ int destinationFailedInputIndex) {
+ if (exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
+ throw new RuntimeException(exLocation.name());
+ }
+ return super.routeInputErrorEventToSource(destinationTaskIndex,
+ destinationFailedInputIndex);
+ }
+
+ @Override
public void routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {