You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:50 UTC

[12/43] tez git commit: TEZ-776. Reduce AM mem usage caused by storing TezEvents (bikas)

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 2ea0299..d177460 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -33,9 +33,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -160,12 +160,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     super(context);
   }
 
-  public static class CustomShuffleEdgeManager extends EdgeManagerPlugin {
+  public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand {
     int numSourceTaskOutputs;
     int numDestinationTasks;
     int basePartitionRange;
     int remainderRangeForLastShuffler;
     int numSourceTasks;
+    
+    int[][] sourceIndices;
+    int[][] targetIndices;
 
     public CustomShuffleEdgeManager(EdgeManagerPluginContext context) {
       super(context);
@@ -231,8 +234,106 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
       destinationTaskAndInputIndices.put(destinationTaskIndex, Collections.singletonList(targetIndex));
     }
+
+    @Override
+    public EventRouteMetadata routeDataMovementEventToDestination(
+        int sourceTaskIndex, int sourceOutputIndex, int destTaskIndex) throws Exception {
+      int sourceIndex = sourceOutputIndex;
+      int destinationTaskIndex = sourceIndex/basePartitionRange;
+      if (destinationTaskIndex != destTaskIndex) {
+        return null;
+      }
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
+      
+      // all inputs from a source task are next to each other in original order
+      int targetIndex = 
+          sourceTaskIndex * partitionRange 
+          + sourceIndex % partitionRange;
+      return EventRouteMetadata.create(1, new int[]{targetIndex});
+    }
+    
+    private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) {
+      int startIndex = taskIndex * offSetPerTask;
+      int[] indices = new int[partitionRange];
+      for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
+        indices[currentIndex] = (startIndex + currentIndex);
+      }
+      return indices;      
+    }
+    
+    @Override
+    public void prepareForRouting() throws Exception {
+      // target indices derive from num src tasks
+      int numSourceTasks = getContext().getSourceVertexNumTasks();
+      targetIndices = new int[numSourceTasks][];
+      for (int srcTaskIndex=0; srcTaskIndex<numSourceTasks; ++srcTaskIndex) {
+        targetIndices[srcTaskIndex] = createIndices(basePartitionRange, srcTaskIndex,
+            basePartitionRange);
+      }
+      
+      // source indices derive from num dest tasks (==partitions)
+      int numTargetTasks = getContext().getDestinationVertexNumTasks();
+      sourceIndices = new int[numTargetTasks][];
+      for (int destTaskIndex=0; destTaskIndex<numTargetTasks; ++destTaskIndex) {
+        int partitionRange = basePartitionRange;
+        if (destTaskIndex == (numTargetTasks-1)) {
+          partitionRange = remainderRangeForLastShuffler;
+        }
+        // skip the basePartitionRange per destination task
+        sourceIndices[destTaskIndex] = createIndices(partitionRange, destTaskIndex,
+            basePartitionRange);
+      }
+    }
+
+    private int[] createTargetIndicesForRemainder(int srcTaskIndex) {
+      // for the last task just generate on the fly instead of doubling the memory
+      return createIndices(remainderRangeForLastShuffler, srcTaskIndex,
+          remainderRangeForLastShuffler);
+    }
     
     @Override
+    public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+        int sourceTaskIndex, int destinationTaskIndex)
+        throws Exception {
+      int[] targetIndicesToSend;
+      int partitionRange;
+      if(destinationTaskIndex == (numDestinationTasks-1)) {
+        if (remainderRangeForLastShuffler != basePartitionRange) {
+          targetIndicesToSend = createTargetIndicesForRemainder(sourceTaskIndex);
+        } else {
+          targetIndicesToSend = targetIndices[sourceTaskIndex];
+        }
+        partitionRange = remainderRangeForLastShuffler;
+      } else {
+        targetIndicesToSend = targetIndices[sourceTaskIndex];
+        partitionRange = basePartitionRange;
+      }
+
+      return EventRouteMetadata.create(partitionRange, targetIndicesToSend, 
+          sourceIndices[destinationTaskIndex]);
+    }
+
+    @Override
+    public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+        int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+      int partitionRange = basePartitionRange;
+      if (destinationTaskIndex == (numDestinationTasks-1)) {
+        partitionRange = remainderRangeForLastShuffler;
+      }
+      int startOffset = sourceTaskIndex * partitionRange;        
+      int[] targetIndices = new int[partitionRange];
+      for (int i=0; i<partitionRange; ++i) {
+        targetIndices[i] = (startOffset + i);
+      }
+      return EventRouteMetadata.create(partitionRange, targetIndices);
+    }
+
+    @Override
     public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, 
         Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
       if (remainderRangeForLastShuffler < basePartitionRange) {
@@ -281,6 +382,18 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
 
     @Override
+    public int routeInputErrorEventToSource(int destinationTaskIndex,
+        int destinationFailedInputIndex) {
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
+      return destinationFailedInputIndex/partitionRange;
+    }
+
+    @Override
     public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
       return numDestinationTasks;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 70be21b..7ba6028 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -308,7 +308,7 @@ public class TestExceptionPropagation {
     // EdgeManager
     EM_Initialize, EM_GetNumDestinationTaskPhysicalInputs, EM_GetNumSourceTaskPhysicalOutputs,
     EM_RouteDataMovementEventToDestination, EM_GetNumDestinationConsumerTasks,
-    EM_RouteInputErrorEventToSource,
+    EM_RouteInputErrorEventToSource, EM_PrepareForRouting,
     // Not Supported yet
     // EM_RouteInputSourceTaskFailedEventToDestination,
 
@@ -814,6 +814,33 @@ public class TestExceptionPropagation {
       super.routeDataMovementEventToDestination(event, sourceTaskIndex,
           sourceOutputIndex, destinationTaskAndInputIndices);
     }
+    
+    @Override
+    public void prepareForRouting() throws Exception {
+      if (exLocation == ExceptionLocation.EM_PrepareForRouting) {
+        throw new RuntimeException(exLocation.name());
+      }
+      super.prepareForRouting();
+    }
+    
+    @Override
+    public EventRouteMetadata routeDataMovementEventToDestination(
+        int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+      if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.routeDataMovementEventToDestination(sourceTaskIndex, sourceOutputIndex, destinationTaskIndex);
+    }
+
+    @Override
+    public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+        int sourceTaskIndex, int destinationTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.routeCompositeDataMovementEventToDestination(sourceTaskIndex, destinationTaskIndex);
+    }
 
     @Override
     public int routeInputErrorEventToSource(InputReadErrorEvent event,
@@ -826,6 +853,16 @@ public class TestExceptionPropagation {
     }
 
     @Override
+    public int routeInputErrorEventToSource(int destinationTaskIndex,
+        int destinationFailedInputIndex) {
+      if (exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
+        throw new RuntimeException(exLocation.name());
+      }
+      return super.routeInputErrorEventToSource(destinationTaskIndex,
+          destinationFailedInputIndex);
+    }
+
+    @Override
     public void routeInputSourceTaskFailedEventToDestination(
         int sourceTaskIndex,
         Map<Integer, List<Integer>> destinationTaskAndInputIndices) {