You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:42 UTC

[36/50] [abbrv] git commit: TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness (Rajesh Balamohan)

TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness  (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9159e117
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9159e117
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9159e117

Branch: refs/heads/branch-0.5
Commit: 9159e11700d3eff722fdda615565528182a619d2
Parents: 4023898
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Sep 23 05:59:50 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Sep 23 05:59:50 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../vertexmanager/ShuffleVertexManager.java     |  60 ++++++++---
 .../vertexmanager/TestShuffleVertexManager.java | 105 ++++++++++++++++++-
 3 files changed, 146 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 407acc9..f3b2ed0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@ ALL CHANGES
   TEZ-1550. TestEnvironmentUpdateUtils.testMultipleUpdateEnvironment fails on
   Windows
   TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows
+  TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 85d46d5..2aaae16 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.library.vertexmanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -120,7 +121,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   int minTaskParallelism = 1;
   boolean enableAutoParallelism = false;
   boolean parallelismDetermined = false;
-  
+
   int totalNumSourceTasks = 0;
   int numSourceTasksCompleted = 0;
   int numVertexManagerEventsReceived = 0;
@@ -382,18 +383,43 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     totalNumSourceTasks = numSrcTasks;
   }
 
-  void determineParallelismAndApply() {
+  /**
+   * Compute optimal parallelism needed for the job
+   * @return true (if parallelism is determined), false otherwise
+   */
+  @VisibleForTesting
+  boolean determineParallelismAndApply() {
     if(numSourceTasksCompleted == 0) {
-      return;
+      return true;
     }
     
     if(numVertexManagerEventsReceived == 0) {
-      return;
+      return true;
     }
     
     int currentParallelism = pendingTasks.size();
-    long expectedTotalSourceTasksOutputSize = 
-        (totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
+    /**
+     * When overall completed output size is not even equal to
+     * desiredTaskInputSize, we can wait for some more data to be available to determine
+     * better parallelism until max.fraction is reached.  min.fraction is just a hint to the
+     * framework and need not be honored strictly in this case.
+     */
+    boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
+        desiredTaskInputDataSize)
+        && (numSourceTasksCompleted < (totalNumSourceTasks * slowStartMaxSrcCompletionFraction));
+    if (canDetermineParallelismLater) {
+      LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
+          + ", totalNumSourceTasks=" + totalNumSourceTasks
+          + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize
+          + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived
+          + ", numSourceTasksCompleted=" + numSourceTasksCompleted + ", maxThreshold="
+          + (totalNumSourceTasks * slowStartMaxSrcCompletionFraction));
+      return false;
+    }
+
+    long expectedTotalSourceTasksOutputSize =
+        (totalNumSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived;
+
     int desiredTaskParallelism = 
         (int)(
             (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
@@ -403,7 +429,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     
     if(desiredTaskParallelism >= currentParallelism) {
-      return;
+      return true;
     }
     
     // most shufflers will be assigned this range
@@ -411,7 +437,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     
     if (basePartitionRange <= 1) {
       // nothing to do if range is equal 1 partition. shuffler does it by default
-      return;
+      return true;
     }
     
     int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
@@ -425,7 +451,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + " . Expected output: " + expectedTotalSourceTasksOutputSize 
         + " based on actual output: " + completedSourceTasksOutputSize
         + " from " + numVertexManagerEventsReceived + " vertex manager events. "
-        + " desiredTaskInputSize: " + desiredTaskInputDataSize);
+        + " desiredTaskInputSize: " + desiredTaskInputDataSize + " max slow start tasks:" +
+        (totalNumSourceTasks * slowStartMaxSrcCompletionFraction) + " num sources completed:" +
+        numSourceTasksCompleted);
           
     if(finalTaskParallelism < currentParallelism) {
       // final parallelism is less than actual parallelism
@@ -447,8 +475,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       }
       
       getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
-      updatePendingTasks();      
+      updatePendingTasks();
     }
+    return true;
   }
   
   void schedulePendingTasks(int numTasksToSchedule) {
@@ -460,9 +489,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // calculating parallelism or change parallelism while tasks are already
     // running then we can create other parameters to trigger this calculation.
     if(enableAutoParallelism && !parallelismDetermined) {
-      // do this once
-      parallelismDetermined = true;
-      determineParallelismAndApply();
+      parallelismDetermined = determineParallelismAndApply();
+      if (!parallelismDetermined) {
+        //try to determine parallelism later when more info is available.
+        return;
+      }
     }
     List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
     while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
@@ -498,8 +529,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     // linearly increase the number of scheduled tasks such that all tasks are 
     // scheduled when source tasks completed fraction reaches max
     float tasksFractionToSchedule = 1; 
-    float percentRange = slowStartMaxSrcCompletionFraction - 
-                          slowStartMinSrcCompletionFraction;
+    float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction;
     if (percentRange > 0) {
       tasksFractionToSchedule = 
             (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/

http://git-wip-us.apache.org/repos/asf/tez/blob/9159e117/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 4768c6c..9ac8210 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -232,14 +232,105 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(4, scheduledTasks.size());
     Assert.assertEquals(1, manager.numSourceTasksCompleted);
     Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
-    
-    
+
+    /**
+     * Test for TEZ-978
+     * Delay determining parallelism until enough data has been received.
+     */
+    scheduledTasks.clear();
+    payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
+    vmEvent = VertexManagerEvent.create("Vertex", payload);
+
+    //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
+    manager = createManager(conf, mockContext, 0.01f, 0.75f);
+    manager.onVertexStarted(null);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(4, manager.totalNumSourceTasks);
+    Assert.assertEquals(0, manager.numSourceTasksCompleted);
+
+    //First task in src1 completed with small payload
+    manager.onVertexManagerEventReceived(vmEvent); //small payload
+    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(1, manager.numSourceTasksCompleted);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
+
+    //Second task in src1 completed with small payload
+    manager.onVertexManagerEventReceived(vmEvent); //small payload
+    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
+    Assert.assertTrue(manager.determineParallelismAndApply() == false);
+    Assert.assertEquals(4, manager.pendingTasks.size());
+    Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
+    Assert.assertEquals(1, manager.numSourceTasksCompleted);
+    Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
+
+    //First task in src2 completed (with larger payload) to trigger determining parallelism
+    payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString()
+            .asReadOnlyByteBuffer();
+    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
+    verify(mockContext, times(1)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+        anyMap(),
+        anyMap());
+    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    Assert.assertEquals(1, manager.pendingTasks.size());
+    Assert.assertEquals(1, scheduledTasks.size());
+    Assert.assertEquals(2, manager.numSourceTasksCompleted);
+    Assert.assertEquals(3, manager.numVertexManagerEventsReceived);
+    Assert.assertEquals(1202L, manager.completedSourceTasksOutputSize);
+
+    //Test for max fraction. Min fraction is just instruction to framework, but honor max fraction
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
+    scheduledTasks.clear();
+    payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString()
+            .asReadOnlyByteBuffer();
+    vmEvent = VertexManagerEvent.create("Vertex", payload);
+
+    //min/max fraction of 0.0/0.2
+    manager = createManager(conf, mockContext, 0.0f, 0.2f);
+    manager.onVertexStarted(null);
+    Assert.assertEquals(40, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(40, manager.totalNumSourceTasks);
+    Assert.assertEquals(0, manager.numSourceTasksCompleted);
+    //send 7 events with payload size as 100
+    for(int i=0;i<7;i++) {
+      manager.onVertexManagerEventReceived(vmEvent); //small payload
+      manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
+      //should not change parallelism
+      verify(mockContext, times(0)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
+          anyMap(),
+          anyMap());
+    }
+    //send 8th event with payload size as 100
+    manager.onVertexManagerEventReceived(vmEvent);
+    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(8));
+    //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
+    verify(mockContext, times(1)).setVertexParallelism(eq(4), any(VertexLocationHint.class),
+        anyMap(),
+        anyMap());
+
+    //reset context for next test
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
     // parallelism changed due to small data size
     scheduledTasks.clear();
     payload =
         VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
     vmEvent = VertexManagerEvent.create("Vertex", payload);
-    
+
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
     manager.onVertexStarted(null);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
@@ -266,7 +357,9 @@ public class TestShuffleVertexManager {
     manager.onVertexManagerEventReceived(vmEvent);
     manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
     // managedVertex tasks reduced
-    verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
+    verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+        anyMap(),
+        anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     // TODO improve tests for parallelism
     Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -279,7 +372,9 @@ public class TestShuffleVertexManager {
     
     // more completions dont cause recalculation of parallelism
     manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
-    verify(mockContext).setVertexParallelism(eq(2), any(VertexLocationHint.class), anyMap(), anyMap());
+    verify(mockContext, times(2)).setVertexParallelism(eq(2), any(VertexLocationHint.class),
+        anyMap(),
+        anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();