You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/04/10 18:20:14 UTC

[1/2] tez git commit: TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)

Repository: tez
Updated Branches:
  refs/heads/master e0ee28ae6 -> f355a050c


http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
index 1afedb9..b586de6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -50,8 +50,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testTwoWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null, null);
+    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, null);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(12);
     testTwoWayV0(emConfig);
     testTwoWayV1(emConfig);
@@ -144,9 +144,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
     CartesianProductFilterDescriptor filterDescriptor =
       new CartesianProductFilterDescriptor(TestFilter.class.getName())
         .setUserPayload(UserPayload.create(buffer));
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null,
-        filterDescriptor);
+    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, filterDescriptor);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
     testTwoWayV0WithFilter(emConfig);
     testTwoWayV1WithFilter(emConfig);
@@ -205,8 +204,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testThreeWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, null, null);
+    CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, 0, 0, null);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(24);
     testThreeWayV0(emConfig);
     testThreeWayV1(emConfig);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
index db781f3..1ce9c8b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
@@ -23,6 +23,10 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetad
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForDest;
+import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForInputError;
+import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForRouting;
+import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForSrc;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -40,186 +44,128 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
     edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext);
   }
 
-  /**
-   * Vertex v0 has 2 tasks
-   * Vertex v1 has 3 tasks
-   */
-  @Test(timeout = 5000)
-  public void testTwoWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
-        new int[]{2,3}, new int[]{2,3}, null);
-    testTwoWayV0(emConfig);
-    testTwoWayV1(emConfig);
-  }
-
-  private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData =
-      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
+  static class TestData {
+    int srcId, destId, inputId;
+    Object expected;
 
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
+    public TestData(int srcId, int destId, int inputId, Object expected) {
+      this.srcId = srcId;
+      this.destId = destId;
+      this.inputId = inputId;
+      this.expected = expected;
+    }
 
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
+    public static TestData dataForRouting(int srcId, int destId, Object expected) {
+      return new TestData(srcId, destId, -1, expected);
+    }
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    public static TestData dataForInputError(int destId, int inputId, Object expected) {
+      return new TestData(-1, destId, inputId, expected);
+    }
 
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+    public static TestData dataForSrc(int srcId, Object expected) {
+      return new TestData(srcId, -1, -1, expected);
+    }
 
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+    public static TestData dataForDest(int destId, Object expected) {
+      return new TestData(-1, destId, -1, expected);
+    }
   }
 
-  private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData =
-      edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(2, edgeManager.getNumDestinationConsumerTasks(1));
+  private void testEdgeManager(CartesianProductEdgeManagerConfig conf, String vName, int numTask,
+                               String groupName, TestData cDMEInvalid, TestData cDMEValid,
+                               TestData srcFailInvalid, TestData srcFailValid,
+                               TestData inputError, TestData numDestInput,
+                               TestData numSrcOutputTest, TestData numConsumerTest)
+    throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn(vName);
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask);
+    when(mockContext.getVertexGroupName()).thenReturn(groupName);
+    edgeManager.initialize(conf);
+
+    CompositeEventRouteMetadata cDME =
+      edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId,
+        cDMEInvalid.destId);
+    assertNull(cDME);
+
+    cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId,
+      cDMEValid.destId);
+    assertNotNull(cDME);
+    CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected);
+    assertEquals(expectedCDME.getCount(), cDME.getCount());
+    assertEquals(expectedCDME.getTarget(), cDME.getTarget());
+    assertEquals(expectedCDME.getSource(), cDME.getSource());
+
+    EventRouteMetadata dme =
+      edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId,
+        srcFailInvalid.destId);
+    assertNull(dme);
+
+    dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId,
+      srcFailValid.destId);
+    assertNotNull(dme);
+    EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected);
+    assertEquals(expectedDME.getNumEvents(), dme.getNumEvents());
+    assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices());
+
+    assertEquals(inputError.expected,
+      edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId));
+
+    assertEquals(numDestInput.expected,
+      edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId));
+    assertEquals(numSrcOutputTest.expected,
+      edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId));
+    assertEquals(numConsumerTest.expected,
+      edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId));
   }
 
   /**
    * Vertex v0 has 2 tasks
    * Vertex v1 has 3 tasks
-   * Vertex v2 has 4 tasks
    */
   @Test(timeout = 5000)
-  public void testThreeWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, null,
-        new int[]{2,3,4}, new int[]{2,3,4}, null);
-    testThreeWayV0(emConfig);
-    testThreeWayV1(emConfig);
-    testThreeWayV2(emConfig);
-  }
-
-  private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 12);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
-  }
-
-  private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 16);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(8, edgeManager.getNumDestinationConsumerTasks(1));
+  public void testTwoWayAllVertex() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
+      new int[]{2,3}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
+        new int[]{2,3}, 3, 0, null), "v1", 3, null,
+      dataForRouting(1, 2, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 2, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1,0,1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2));
   }
 
-  private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v2");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 13);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(6, edgeManager.getNumDestinationConsumerTasks(1));
+  /**
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 3 tasks
+   * Vertex v2 has 4 tasks
+   */
+  @Test(timeout = 5000)
+  public void testThreeWayAllVertex() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
+      null, new int[]{2,3,4}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 12, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 12, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 12));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
+        null, new int[]{2,3,4}, 3, 0, null), "v1", 3, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 16, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 16, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 8));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"},
+        null, new int[]{2,3,4}, 4, 0, null), "v2", 4, null,
+      dataForRouting(1, 0, null), dataForRouting(1, 13, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 0, null), dataForRouting(1, 13, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 6));
   }
 
   @Test(timeout = 5000)
   public void testZeroSrcTask() {
     CartesianProductEdgeManagerConfig emConfig =
       new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null,
-        new int[]{2,0}, new int[]{2,0}, null);
+        new int[]{2,0}, 0,0, null);
     testZeroSrcTaskV0(emConfig);
     testZeroSrcTaskV1(emConfig);
   }
@@ -240,67 +186,103 @@ public class TestCartesianProductEdgeManagerUnpartitioned {
   }
 
   /**
-   * Vertex v0 has 20 tasks 10 groups
-   * Vertex v1 has 10 tasks 1 group
+   * Vertex v0 has 10 tasks 2 groups
+   * Vertex v1 has 30 tasks 3 group
    */
   @Test(timeout = 5000)
-  public void testTwoWayAutoGrouping() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null,
-        new int[]{20, 10}, new int[]{10,1}, null);
-    testTwoWayAutoGroupingV0(emConfig);
-    testTwoWayAutoGroupingV1(emConfig);
+  public void testTwoWayAllVertexAutoGrouping() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
+      dataForRouting(6, 1, null), dataForRouting(1, 0, CompositeEventRouteMetadata.create(1, 1, 0)),
+      dataForRouting(6, 1, null), dataForRouting(1, 0, EventRouteMetadata.create(1, new int[]{1})),
+      dataForInputError(1, 1, 1), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"},
+        null, new int[]{2,3}, 3, 0, null), "v1", 30, null,
+      dataForRouting(6, 1, null), dataForRouting(11, 1, CompositeEventRouteMetadata.create(1, 1, 0)),
+      dataForRouting(6, 1, null), dataForRouting(11, 1, EventRouteMetadata.create(1, new int[]{1})),
+      dataForInputError(1, 1, 11), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2));
   }
 
-  private void testTwoWayAutoGroupingV0(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(20);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(1, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(7, edgeManager.routeInputErrorEventToSource(3, 1));
-
-    assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(4));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(5));
-    assertEquals(1, edgeManager.getNumDestinationConsumerTasks(6));
+  /**
+   * v0 with group g0 {v1, v2}
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 1 tasks
+   * Vertex v2 has 2 tasks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayVertexWithVertexGroup() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 1, 0, null), "v1", 1, "g0",
+      dataForRouting(0, 1, null), dataForRouting(0, 3, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(0, 3, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(3, 0, 0), dataForDest(0, 1), dataForSrc(0, 1), dataForSrc(0, 2));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 1, null), "v2", 2, "g0",
+      dataForRouting(1, 1, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2));
   }
 
-  private void testTwoWayAutoGroupingV1(CartesianProductEdgeManagerConfig config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(10);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(1, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{2}, routingData.getTargetIndices());
+  /**
+   * group g0 {v1, v2} with group g1 {v3, v4}
+   *
+   * Vertex v0 has 1 tasks
+   * Vertex v1 has 2 tasks
+   * Vertex v2 has 3 tasks
+   * Vertex v3 has 4 tasks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayAllVertexGroup() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
+        null, new int[]{3,7}, 1, 0, null), "v0", 1, "g0",
+      dataForRouting(0, 7, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 7, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(0, 1), dataForSrc(0, 7));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
+        null, new int[]{3,7}, 2, 1, null), "v1", 2, "g0",
+      dataForRouting(0, 1, null), dataForRouting(1, 15, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(1, 15, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(7, 0, 0), dataForDest(7, 1), dataForSrc(1, 1), dataForSrc(1, 7));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
+        null, new int[]{3,7}, 3, 0, null), "v2", 3, "g1",
+      dataForRouting(1, 0, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 0, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"},
+        null, new int[]{3,7}, 4, 3, null), "v3", 4, "g1",
+      dataForRouting(0, 1, null), dataForRouting(1, 4, CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(1, 4, EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(4, 0, 1), dataForDest(4, 1), dataForSrc(1, 1), dataForSrc(1, 3));
+  }
 
-    assertEquals(5, edgeManager.routeInputErrorEventToSource(4, 5));
 
-    assertEquals(10, edgeManager.getNumDestinationTaskPhysicalInputs(6));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(7));
-    assertEquals(10, edgeManager.getNumDestinationConsumerTasks(8));
+  /**
+   * v0 with group g0 {v1, v2}
+   * Vertex v0 has 10 tasks, 2 groups
+   * Vertex v1 has 10 tasks, 1 group
+   * Vertex v2 has 20 tasks, 2 groups
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayWithVertexGroupAutoGrouping() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
+      dataForRouting(0, 4, null), dataForRouting(2, 1, CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(0, 4, null), dataForRouting(2, 1, EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(1, 3, 3), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 1, 0, null), "v1", 10, "g0",
+      dataForRouting(1, 1, null), dataForRouting(2, 3, CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(1, 1, null), dataForRouting(2, 3, EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(3, 1, 1), dataForDest(0, 10), dataForSrc(1, 1), dataForSrc(1, 2));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 1, null), "v2", 20, "g0",
+      dataForRouting(11, 1, null), dataForRouting(12, 2, CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(11, 1, null), dataForRouting(12, 2, EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(2, 2, 12), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
index f3a5851..5144e69 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
@@ -29,6 +29,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -136,6 +137,24 @@ public class TestCartesianProductVertexManager {
   }
 
   @Test(timeout = 5000)
+  public void testCheckDAGConfigConsistentWithVertexGroup() throws Exception {
+    // positive case
+    edgePropertyMap.put("v2", cpEdge);
+    config = new CartesianProductConfig(new int[]{2, 3}, new String[]{"v0", "g0"}, null);
+    Map<String, List<String>> vertexGroups = new HashMap<>();
+    vertexGroups.put("g0", Arrays.asList("v1", "v2"));
+    when(context.getInputVertexGroups()).thenReturn(vertexGroups);
+    vertexManager.initialize();
+
+    // vertex group is in cartesian product config, but one member doesn't have cp edge
+    edgePropertyMap.put("v2", broadcastEdge);
+    try {
+      vertexManager.initialize();
+      assertTrue(false);
+    } catch (Exception ignored) {}
+  }
+
+  @Test(timeout = 5000)
   public void testOtherEdgeType() throws Exception {
     // forbid other custom edge
     edgePropertyMap.put("v2", customEdge);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
index bf369d9..5c6ffa7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
@@ -39,15 +39,15 @@ public class TestCartesianProductVertexManagerConfig {
     CartesianProductVertexManagerConfig vmConf =
       CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
     assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT,
-      vmConf.isEnableAutoGrouping());
+      vmConf.enableAutoGrouping);
     assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT,
-      vmConf.getDesiredBytesPerGroup());
+      vmConf.desiredBytesPerChunk);
 
     // auto group set in proto
     conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
     conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
     vmConf = CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
-    assertEquals(true, vmConf.isEnableAutoGrouping());
-    assertEquals(1000, vmConf.getDesiredBytesPerGroup());
+    assertEquals(true, vmConf.enableAutoGrouping);
+    assertEquals(1000, vmConf.desiredBytesPerChunk);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
index d2ce378..f95daa7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -17,9 +17,9 @@
  */
 package org.apache.tez.runtime.library.cartesianproduct;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
@@ -37,11 +37,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.Matchers;
 import org.mockito.MockitoAnnotations;
 
 import java.util.ArrayList;
-import java.util.Formatter;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,8 +48,6 @@ import java.util.Map;
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyMapOf;
@@ -63,150 +60,385 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestCartesianProductVertexManagerUnpartitioned {
+  private static long desiredBytesPerGroup = 1000;
   @Captor
   private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
   @Captor
-  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor;
+  @Captor
+  private ArgumentCaptor<Integer> parallelismCaptor;
   private CartesianProductVertexManagerUnpartitioned vertexManager;
-  private VertexManagerPluginContext context;
-  private List<TaskAttemptIdentifier> allCompletions;
+  private VertexManagerPluginContext ctx;
 
   @Before
-  public void setup() throws Exception {
+  public void setup() {
     MockitoAnnotations.initMocks(this);
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
+  }
 
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-        CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null));
+  /**
+   * v0 and v1 are two cartesian product sources
+   */
+  private void setupDAGVertexOnly(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
+    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3);
+
+    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
+      false, new String[]{"v0","v1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  /**
+   * v0 and v1 are two cartesian product sources; v2 is broadcast source; without auto grouping
+   */
+  private void setupDAGVertexOnlyWithBroadcast() throws Exception {
+    Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2);
     edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
-    when(context.getVertexNumTasks(eq("v2"))).thenReturn(5);
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    setSrcParallelism(ctx, 2, 3, 5);
 
     CartesianProductVertexManagerConfig config =
       new CartesianProductVertexManagerConfig(
         false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null);
     vertexManager.initialize(config);
-    allCompletions = new ArrayList<>();
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 1), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 1), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 2), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v2",
+  }
+
+  /**
+   * v0 and g0 are two sources; g0 is vertex group of v1 and v2
+   */
+  private void setupDAGVertexGroup(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
+    setSrcParallelism(ctx, doGrouping ? 10: 1, 2, 3, 4);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v1", "v2"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
+      false, new String[]{"v0","g0"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  /**
+   * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3
+   */
+  private void setupDAGVertexGroupOnly(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4));
+    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3, 4, 5);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v0", "v1"));
+    vertexGroupMap.put("g1", Arrays.asList("v2", "v3"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
+      false, new String[]{"g0","g1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) {
+    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+    for (int i = 0; i < numSrcV; i++) {
+      edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null));
+    }
+    return edgePropertyMap;
+  }
+
+  private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) {
+    int i = 0;
+    for (int numTask : numTasks) {
+      when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier);
+      i++;
+    }
+  }
+
+  private TaskAttemptIdentifier getTaId(String vertexName, int taskId) {
+    return new TaskAttemptIdentifierImpl("dag", vertexName,
       TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 3), 0), 0)));
+        TezDAGID.getInstance("0", 0, 0), 0), taskId), 0));
+  }
+
+  private VertexManagerEvent getVMEevnt(long outputSize, String vName, int taskId) {
+
+    VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder();
+    builder.setOutputSize(outputSize);
+    VertexManagerEvent vmEvent =
+      VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer());
+    vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId));
+    return vmEvent;
+  }
+
+  private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources,
+                                    int[] numChunksPerSrc, int numChunk, int chunkIdOffset)
+    throws InvalidProtocolBufferException {
+    CartesianProductEdgeManagerConfig conf = CartesianProductEdgeManagerConfig.fromUserPayload(
+      edgeProperty.getEdgeManagerDescriptor().getUserPayload());
+    assertArrayEquals(sources, conf.getSourceVertices().toArray());
+    assertArrayEquals(numChunksPerSrc, conf.numChunksPerSrc);
+    assertEquals(numChunk, conf.numChunk);
+    assertEquals(chunkIdOffset, conf.chunkIdOffset);
+  }
+
+  private void verifyScheduleRequest(int expectedTimes, int... expectedTid) {
+    verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture());
+    if (expectedTimes > 0) {
+      List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue();
+      int i = 0;
+      for (int tid : expectedTid) {
+        assertEquals(tid, requests.get(i).getTaskIndex());
+        i++;
+      }
+    }
   }
 
   @Test(timeout = 5000)
-  public void testReconfigureVertex() throws Exception {
-    ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
+  public void testDAGVertexOnly() throws Exception {
+    setupDAGVertexOnly(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    verify(context, never()).reconfigureVertex(
+    verify(ctx, never()).reconfigureVertex(
       anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
       isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    assertEquals(6, (int)parallelismCaptor.getValue());
+    assertEquals(6, (int) parallelismCaptor.getValue());
     Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    assertFalse(edgeProperties.containsKey("v2"));
-    for (EdgeProperty edgeProperty : edgeProperties.values()) {
-      UserPayload payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
-      CartesianProductEdgeManagerConfig newConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-      assertArrayEquals(new int[]{2,3}, newConfig.getNumTasks());
-      assertArrayEquals(new int[]{2,3}, newConfig.getNumGroups());
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{2, 3}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{2, 3}, 3, 0);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    verify(ctx, never()).scheduleTasks(scheduleRequestCaptor.capture());
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
+    verifyScheduleRequest(1, 0);
+
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
+    verify(ctx, times(2)).scheduleTasks(scheduleRequestCaptor.capture());
+    verifyScheduleRequest(2, 1);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexOnlyWithGrouping() throws Exception {
+    setupDAGVertexOnly(true);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 1; i < 30; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", i));
+    }
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{10, 1}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{10, 1}, 1, 0);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    for (int i = 0; i < 29; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
     }
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 29));
+    verifyScheduleRequest(1, 0);
   }
 
   @Test(timeout = 5000)
-  public void testOnSourceTaskComplete() throws Exception {
+  public void testDAGVertexGroup() throws Exception {
+    setupDAGVertexGroup(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals(14, (int) parallelismCaptor.getValue());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{2, 7}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{2, 7}, 3, 0);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{2, 7}, 4, 3);
+
     vertexManager.onVertexStarted(null);
-    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(2));
-    // cannot start schedule because broadcast vertex isn't in RUNNING state
-    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0",1));
+    vertexManager.onSourceTaskCompleted(getTaId("v1",2));
+    verifyScheduleRequest(1, 9);
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
+    verifyScheduleRequest(2, 10);
+  }
 
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
-    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
-
-    // v2 completion shouldn't matter
-    vertexManager.onSourceTaskCompleted(allCompletions.get(5));
-    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-
-    vertexManager.onSourceTaskCompleted(allCompletions.get(3));
-    verify(context, times(2)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(1, requests.get(0).getTaskIndex());
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupWithGrouping() throws Exception {
+    setupDAGVertexGroup(true);
+
+    for (int i = 0; i < 3; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
+    }
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v1", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 0; i < 40; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v2", i));
+    }
+
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{10, 31}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{10, 31}, 30, 0);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{10, 31}, 1, 30);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 10));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    verifyScheduleRequest(1, 10);
+    for (int i = 1; i < 40; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v2", i));
+    }
+    verifyScheduleRequest(2, 30);
   }
 
-  private void testOnVertexStartHelper(boolean broadcastRunning) throws Exception {
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupOnly() throws Exception {
+    setupDAGVertexGroupOnly(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    if (broadcastRunning) {
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v3", VertexState.CONFIGURED));
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals(45, (int) parallelismCaptor.getValue());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{5, 9}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{5, 9}, 3, 2);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{5, 9}, 4, 0);
+    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{5, 9}, 5, 4);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 3));
+    verifyScheduleRequest(1, 12);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 2));
+    verifyScheduleRequest(2, 39);
+    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
+    verifyScheduleRequest(3, 13, 40);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupOnlyWithGrouping() throws Exception {
+    setupDAGVertexGroupOnly(true);
+
+    for (int i = 0; i < 4; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
     }
 
-    List<TaskAttemptIdentifier> completions = new ArrayList<>();
-    completions.add(allCompletions.get(0));
-    completions.add(allCompletions.get(2));
-    completions.add(allCompletions.get(5));
-    vertexManager.onVertexStarted(completions);
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v2", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
 
-    if (!broadcastRunning) {
-      verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 0; i < 5; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup/5, "v1", i));
+    }
+    for (int i = 0; i < 50; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v3", i));
     }
 
-    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{16, 41}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{16, 41}, 6, 10);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{16, 41}, 40, 0);
+    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{16, 41}, 1, 40);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 20));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    verifyScheduleRequest(1, 20);
+    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
+    verifyScheduleRequest(1);
+    for (int i = 1; i < 50; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v3", i));
+    }
+    verifyScheduleRequest(2, 40);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 5));
+    verifyScheduleRequest(2);
+    for (int i = 6; i < 10; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
+    }
+    verifyScheduleRequest(3, 471, 491);
   }
 
   @Test(timeout = 5000)
-  public void testOnVertexStartWithBroadcastRunning() throws Exception {
-    testOnVertexStartHelper(true);
+  public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
+    setupDAGVertexOnlyWithBroadcast();
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onVertexStarted(null);
+
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
+    verifyScheduleRequest(0);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
+    verifyScheduleRequest(1);
+    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
   }
 
   @Test(timeout = 5000)
-  public void testOnVertexStartWithoutBroadcastRunning() throws Exception {
-    testOnVertexStartHelper(false);
+  public void testOnVertexStart() throws Exception {
+    setupDAGVertexOnly(false);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
 
+    vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0)));
+    verifyScheduleRequest(1, 0);
   }
 
   @Test(timeout = 5000)
   public void testZeroSrcTask() throws Exception {
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(0);
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
+    when(ctx.getVertexNumTasks(eq("v0"))).thenReturn(2);
+    when(ctx.getVertexNumTasks(eq("v1"))).thenReturn(0);
 
     CartesianProductVertexManagerConfig config =
       new CartesianProductVertexManagerConfig(
@@ -216,144 +448,13 @@ public class TestCartesianProductVertexManagerUnpartitioned {
       CartesianProductEdgeManager.class.getName()), null, null, null, null));
     edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create(
       CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
 
     vertexManager.initialize(config);
-    allCompletions = new ArrayList<>();
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 1), 0)));
-
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
     vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    vertexManager.onSourceTaskCompleted(allCompletions.get(1));
-  }
-
-  @Test(timeout = 5000)
-  public void testAutoGrouping() throws Exception {
-    testAutoGroupingHelper(false);
-    testAutoGroupingHelper(true);
-  }
-
-  private void testAutoGroupingHelper(boolean enableAutoGrouping) throws Exception {
-    int numTaskV0 = 20;
-    int numTaskV1 = 10;
-    long desiredBytesPerGroup = 1000;
-    long outputBytesPerTaskV0 = 500;
-    long outputBytesPerTaskV1 = 10;
-    int expectedNumGroupV0 = 10;
-    int expectedNumGroupV1 = 1;
-    ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
-    CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig(
-      false, new String[]{"v0","v1"}, null, 0, 0, enableAutoGrouping, desiredBytesPerGroup, null);
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    EdgeProperty edgeProperty = EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null);
-    edgePropertyMap.put("v0", edgeProperty);
-    edgePropertyMap.put("v1", edgeProperty);
-    edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
-
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(numTaskV0);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(numTaskV1);
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-
-    vertexManager.initialize(config);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
-    if (!enableAutoGrouping) {
-      // auto grouping disabled, shouldn't auto group
-      verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-        isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-      assertEquals(numTaskV0 * numTaskV1, parallelismCaptor.getValue().intValue());
-      return;
-    }
-
-    // not enough input size, shouldn't auto group
-    verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
-      anyMapOf(String.class, EdgeProperty.class));
-
-    // only v0 reach threshold or finish all task, shouldn't auto group
-    VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder();
-    builder.setOutputSize(outputBytesPerTaskV0);
-    VertexManagerEventPayloadProto proto = builder.build();
-    VertexManagerEvent vmEvent =
-      VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer());
-
-    for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) {
-      vmEvent.setProducerAttemptIdentifier(
-        new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString(
-          String.format("attempt_1441301219877_0109_1_00_%06d_0", i))));
-      vertexManager.onVertexManagerEventReceived(vmEvent);
-    }
-    verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
-      anyMapOf(String.class, EdgeProperty.class));
-
-    // vmEvent from broadcast vertex shouldn't matter
-    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "v2",
-        TezTaskAttemptID.fromString("attempt_1441301219877_0109_1_00_000000_0")));
-    vertexManager.onVertexManagerEventReceived(vmEvent);
-
-    // v1 finish all tasks but still doesn't reach threshold, auto group anyway
-    proto = builder.setOutputSize(outputBytesPerTaskV1).build();
-    vmEvent = VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer());
-    for (int i = 0; i < numTaskV1; i++) {
-      verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class),
-        anyMapOf(String.class, EdgeProperty.class));
-      vmEvent.setProducerAttemptIdentifier(
-        new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString(
-          String.format("attempt_1441301219877_0109_1_01_%06d_0", i))));
-      vertexManager.onVertexManagerEventReceived(vmEvent);
-    }
-    verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    for (EdgeProperty property : edgeProperties.values()) {
-      UserPayload payload = property.getEdgeManagerDescriptor().getUserPayload();
-      CartesianProductEdgeManagerConfig newConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-      assertArrayEquals(new int[]{numTaskV0, numTaskV1}, newConfig.getNumTasks());
-      assertArrayEquals(new int[]{expectedNumGroupV0,expectedNumGroupV1}, newConfig.getNumGroups());
-    }
-
-    assertEquals(expectedNumGroupV0 * expectedNumGroupV1, parallelismCaptor.getValue().intValue());
-    for (EdgeProperty property : edgePropertiesCaptor.getValue().values()) {
-      CartesianProductEdgeManagerConfig emConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(
-          property.getEdgeManagerDescriptor().getUserPayload());
-      assertArrayEquals(new int[] {numTaskV0, numTaskV1}, emConfig.getNumTasks());
-      assertArrayEquals(new int[] {expectedNumGroupV0, expectedNumGroupV1}, emConfig.getNumGroups());
-    }
-
-    vertexManager.onVertexStarted(null);
-    // v0 t0 finish, shouldn't schedule
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-
-    // v1 all task finish, shouldn't schedule
-    for (int i = 0; i < numTaskV1; i++) {
-      vertexManager.onSourceTaskCompleted(new TaskAttemptIdentifierImpl("dag", "v1",
-        TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-          TezDAGID.getInstance("0", 0, 0), 1), i), 0)));
-      verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    }
-
-    // v0 t1 finish, should schedule
-    vertexManager.onSourceTaskCompleted(allCompletions.get(1));
-    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
   }
 }
\ No newline at end of file


[2/2] tez git commit: TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)

Posted by zh...@apache.org.
TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f355a050
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f355a050
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f355a050

Branch: refs/heads/master
Commit: f355a050cf012854d1f7145d05f1981a3cf97e67
Parents: e0ee28a
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Apr 10 11:18:33 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Apr 10 11:18:33 2017 -0700

----------------------------------------------------------------------
 .../tez/dag/api/EdgeManagerPluginContext.java   |   4 +
 .../tez/dag/api/VertexManagerPluginContext.java |   9 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  12 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   4 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |  14 +
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |  32 ++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   6 +-
 .../tez/dag/app/dag/impl/TestVertexManager.java |  26 +
 .../CartesianProductCombination.java            |  78 ++-
 .../CartesianProductConfig.java                 |  60 +-
 .../CartesianProductEdgeManagerConfig.java      |  39 +-
 .../CartesianProductEdgeManagerPartitioned.java |   8 +-
 .../CartesianProductEdgeManagerReal.java        |   1 -
 ...artesianProductEdgeManagerUnpartitioned.java |  57 +-
 .../CartesianProductVertexManager.java          |  51 +-
 .../CartesianProductVertexManagerConfig.java    |  40 +-
 ...artesianProductVertexManagerPartitioned.java |  10 +-
 ...tesianProductVertexManagerUnpartitioned.java | 316 +++++++----
 .../main/proto/CartesianProductPayload.proto    |   9 +-
 .../TestShuffleVertexManagerUtils.java          |   5 +
 .../TestCartesianProductCombination.java        |  30 +-
 .../TestCartesianProductConfig.java             |   6 +-
 .../TestCartesianProductEdgeManager.java        |   6 +-
 .../TestCartesianProductEdgeManagerConfig.java  |  15 +-
 ...tCartesianProductEdgeManagerPartitioned.java |  13 +-
 ...artesianProductEdgeManagerUnpartitioned.java | 408 +++++++------
 .../TestCartesianProductVertexManager.java      |  19 +
 ...TestCartesianProductVertexManagerConfig.java |   8 +-
 ...tesianProductVertexManagerUnpartitioned.java | 565 +++++++++++--------
 30 files changed, 1099 insertions(+), 754 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
index 79f685d..ef6925b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
@@ -58,4 +58,8 @@ public interface EdgeManagerPluginContext {
    */
   public int getDestinationVertexNumTasks();
 
+  /**
+   * @return the name of vertex group that source vertex belongs to, or null
+   */
+  String getVertexGroupName();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index aa99745..b858a65 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -350,4 +350,13 @@ public interface VertexManagerPluginContext {
   // TODO must be done later after TEZ-1714
   //public void vertexManagerDone();
 
+  /**
+   * Get input vertex groups of this vertex, including vertex group name and
+   * all members vertex name
+   *
+   * @return map whose key is vertex group name and value is list of members' name,
+   *         or empty map if there is no input vertex group.
+   */
+  Map<String, List<String>> getInputVertexGroups();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 1b3b39c..51847d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -149,7 +149,7 @@ public interface Vertex extends Comparable<Vertex> {
   List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException;
   List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException;
   
-  List<GroupInputSpec> getGroupInputSpecList(int taskIndex);
+  List<GroupInputSpec> getGroupInputSpecList();
   void addSharedOutputs(Set<String> outputs);
   Set<String> getSharedOutputs();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 690df63..f78c9a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -28,6 +28,7 @@ import java.util.zip.Deflater;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,17 @@ public class Edge {
       return destinationVertex.getTotalTasks();
     }
 
+    @Override
+    public String getVertexGroupName() {
+      if (destinationVertex.getGroupInputSpecList() != null) {
+        for (GroupInputSpec group : destinationVertex.getGroupInputSpecList()) {
+          if (group.getGroupVertices().contains(getSourceVertexName())) {
+            return group.getGroupName();
+          }
+        }
+      }
+      return null;
+    }
   }
 
   private EdgeProperty edgeProperty;

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1ab3da8..ab17fe4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1581,7 +1581,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return TaskSpec.createBaseTaskSpec(getDAG().getName(),
         getName(), getTotalTasks(), getProcessorDescriptor(),
         getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
-        getGroupInputSpecList(taskIndex), vertexOnlyConf);
+        getGroupInputSpecList(), vertexOnlyConf);
   }
 
   @Override
@@ -4422,7 +4422,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
 
   @Override
-  public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+  public List<GroupInputSpec> getGroupInputSpecList() {
     readLock.lock();
     try {
       return groupInputSpecList;

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 45f72bd..b7d3428 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,6 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -360,6 +362,18 @@ public class VertexManager {
     }
 
     @Override
+    public Map<String, List<String>> getInputVertexGroups() {
+      checkAndThrowIfDone();
+      Map<String, List<String>> inputGroups = Maps.newHashMap();
+      if (managedVertex.getGroupInputSpecList() != null) {
+        for (GroupInputSpec group : managedVertex.getGroupInputSpecList()) {
+          inputGroups.put(group.getGroupName(), Collections.unmodifiableList(group.getGroupVertices()));
+        }
+      }
+      return inputGroups;
+    }
+
+    @Override
     public void onStateUpdated(VertexStateUpdate event) {
       // this is not called by the vertex manager plugin. 
       // no need to synchronize this. similar to other external notification methods

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index f53e505..1143395 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -34,6 +35,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -67,7 +69,9 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.test.EdgeManagerForTest;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -476,4 +480,32 @@ public class TestEdge {
       return emConf.sourceTaskIndex;
     }
   }
+
+  @Test(timeout = 5000)
+  public void testEdgeManagerPluginCtxGetVertexGroupName() throws TezException {
+    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+      EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+    EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor,
+      DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"),
+      InputDescriptor.create("In"));
+    Edge edge = new Edge(edgeProp, null, null);
+
+    Vertex srcV = mock(Vertex.class), destV = mock(Vertex.class);
+    String srcName = "srcV", destName = "destV";
+    when(srcV.getName()).thenReturn(srcName);
+    when(destV.getName()).thenReturn(destName);
+    edge.setSourceVertex(srcV);
+    edge.setDestinationVertex(destV);
+
+    assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+    String group = "group";
+    when(destV.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList("v1", "v3"), null)));
+    assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+    when(destV.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(srcName, "v3"), null)));
+    assertEquals(group, edge.edgeManager.getContext().getVertexGroupName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 90d675e..bc06fd0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5940,10 +5940,10 @@ public class TestVertexImpl {
         VertexEventType.V_INIT));
     dispatcher.await();
     
-    Assert.assertNull(vA.getGroupInputSpecList(0));
-    Assert.assertNull(vB.getGroupInputSpecList(0));
+    Assert.assertNull(vA.getGroupInputSpecList());
+    Assert.assertNull(vB.getGroupInputSpecList());
     
-    List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
+    List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList();
     Assert.assertEquals(1, groupInSpec.size());
     Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
     assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index b93b298..6bec26e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
@@ -30,6 +32,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -55,6 +59,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -205,6 +210,27 @@ public class TestVertexManager {
     assertEquals(Sets.newHashSet("input1","input2"), edgeVertexSet);
   }
 
+  @Test(timeout = 5000)
+  public void testVMPluginCtxGetInputVertexGroup() throws Exception {
+    VertexManager vm =
+      new VertexManager(
+        VertexManagerPluginDescriptor.create(CustomVertexManager.class
+          .getName()), UserGroupInformation.getCurrentUser(),
+        mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+
+    assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty());
+
+    String group = "group", v1 = "v1", v2 = "v2";
+    when(mockVertex.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(v1, v2), null)));
+    Map<String, List<String>> groups = vm.pluginContext.getInputVertexGroups();
+    assertEquals(1, groups.size());
+    assertTrue(groups.containsKey(group));
+    assertEquals(2, groups.get(group).size());
+    assertTrue(groups.get(group).contains(v1));
+    assertTrue(groups.get(group).contains(v2));
+  }
+
   public static class CustomVertexManager extends VertexManagerPlugin {
 
     private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
index c6c95f2..97f3eb2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -25,49 +25,47 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Represent the combination of source partitions or tasks.
+ * Represent the combination of source chunks. A chunk is one or more source tasks or partitions.
  *
- * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
- * destination tasks. The mapping from source partition/task to destination task is like this:
+ * For example, if we have two source vertices and each generates two chunks, we will have 2*2=4
+ * destination tasks. The mapping from source chunks to destination task is like this:
  * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
  *
- * Basically, it stores the source partition/task combination and can compute corresponding
+ * Basically, it stores the source chunk id combination and can compute corresponding
  * destination task. It can also figure out the source combination from a given destination task.
- * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
- * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ * Task id is mapped in the ascending order of combinations, starting from 0.
  *
  * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
  * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
  *
- * Or you can also traverse all combinations that has one specific partition with
- * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * Or you can also traverse all combinations that has one specific chunk with
+ * <method>firstTaskWithFixedChunk</method> and <method>nextTaskWithFixedChunk</method>,
  * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
- * partition.
+ * chunk.
  */
 class CartesianProductCombination {
-  // numPartitions for partitioned case, numTasks for unpartitioned case
-  private int[] numPartitionOrTask;
-  // at which position (in source vertices array) our vertex is
+  private int[] numChunk;
+  // which position (in source vertices array) we care about
   private int positionId = -1;
-  // The i-th element Ci represents partition/task Ci of source vertex i.
+  // The i-th element Ci represents chunk Ci of source vertex i.
   private final Integer[] combination;
-  // the weight of each vertex when computing the task id
+  // helper array to computer task id: task id = (combination) dot-product (factor)
   private final Integer[] factor;
 
-  public CartesianProductCombination(int[] numPartitionOrTask) {
-    Preconditions.checkArgument(!Ints.contains(numPartitionOrTask, 0),
-      "CartesianProductCombination doesn't allow zero partition or task");
-    this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
-    combination = new Integer[numPartitionOrTask.length];
-    factor = new Integer[numPartitionOrTask.length];
+  public CartesianProductCombination(int[] numChunk) {
+    Preconditions.checkArgument(!Ints.contains(numChunk, 0),
+      "CartesianProductCombination doesn't allow zero chunk");
+    this.numChunk = Arrays.copyOf(numChunk, numChunk.length);
+    combination = new Integer[numChunk.length];
+    factor = new Integer[numChunk.length];
     factor[factor.length-1] = 1;
     for (int i = combination.length-2; i >= 0; i--) {
-      factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+      factor[i] = factor[i+1]* numChunk[i+1];
     }
   }
 
-  public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
-    this(numPartitionOrTask);
+  public CartesianProductCombination(int[] numChunk, int positionId) {
+    this(numChunk);
     this.positionId = positionId;
   }
 
@@ -79,24 +77,24 @@ class CartesianProductCombination {
   }
 
   /**
-   * first combination with given partition id in current position
-   * @param partition
+   * first combination with given chunk id in current position
+   * @param chunkId
    */
-  public void firstTaskWithFixedPartition(int partition) {
+  public void firstTaskWithFixedChunk(int chunkId) {
     Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
     Arrays.fill(combination, 0);
-    combination[positionId] = partition;
+    combination[positionId] = chunkId;
   }
 
   /**
-   * next combination without current partition in current position
+   * next combination without current chunk in current position
    * @return false if there is no next combination
    */
-  public boolean nextTaskWithFixedPartition() {
+  public boolean nextTaskWithFixedChunk() {
     Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
     int i;
     for (i = combination.length-1; i >= 0; i--) {
-      if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+      if (i != positionId && combination[i] != numChunk[i]-1) {
         break;
       }
     }
@@ -117,20 +115,20 @@ class CartesianProductCombination {
   }
 
   /**
-   * first combination with given partition id in current position
+   * first combination with given chunk id in current position
    */
   public void firstTask() {
     Arrays.fill(combination, 0);
   }
 
   /**
-   * next combination without current partition in current position
+   * next combination without current chunk in current position
    * @return false if there is no next combination
    */
   public boolean nextTask() {
     int i;
     for (i = combination.length-1; i >= 0; i--) {
-      if (combination[i] != numPartitionOrTask[i]-1) {
+      if (combination[i] != numChunk[i]-1) {
         break;
       }
     }
@@ -145,19 +143,19 @@ class CartesianProductCombination {
   }
 
   /**
-   * @return corresponding task id for current combination
+   * @return corresponding chunk id for current combination
    */
-  public int getTaskId() {
-    int taskId = 0;
+  public int getChunkId() {
+    int chunkId = 0;
     for (int i = 0; i < combination.length; i++) {
-      taskId += combination[i]*factor[i];
+      chunkId += combination[i]*factor[i];
     }
-    return taskId;
+    return chunkId;
   }
 
-  public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+  public static CartesianProductCombination fromTaskId(int[] numChunk,
                                                        int taskId) {
-    CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+    CartesianProductCombination result = new CartesianProductCombination(numChunk);
     for (int i = 0; i < result.combination.length; i++) {
       result.combination[i] = taskId/result.factor[i];
       taskId %= result.factor[i];

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
index b57ed84..12a17cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -46,21 +46,23 @@ import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUs
 @Evolving
 public class CartesianProductConfig {
   private final boolean isPartitioned;
-  private final String[] sourceVertices;
+  private final String[] sources;
+ // numPartition[i] means how many partitions sourceVertices[i] will generate
+ // (not used in unpartitioned case)
   private final int[] numPartitions;
   private final CartesianProductFilterDescriptor filterDescriptor;
 
   /**
    * create config for unpartitioned case
-   * @param sourceVertices list of source vertices names
+   * @param sources list of names of source vertices or vertex groups
    */
-  public CartesianProductConfig(List<String> sourceVertices) {
-    Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
-    Preconditions.checkArgument(sourceVertices.size() > 1,
-      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+  public CartesianProductConfig(List<String> sources) {
+    Preconditions.checkArgument(sources != null, "source list cannot be null");
+    Preconditions.checkArgument(sources.size() > 1,
+      "there must be more than 1 source " + "67, currently only " + sources.size());
 
     this.isPartitioned = false;
-    this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+    this.sources = sources.toArray(new String[sources.size()]);
     this.numPartitions = null;
     this.filterDescriptor = null;
   }
@@ -86,12 +88,12 @@ public class CartesianProductConfig {
 
     this.isPartitioned = true;
     this.numPartitions = new int[vertexPartitionMap.size()];
-    this.sourceVertices = new String[vertexPartitionMap.size()];
+    this.sources = new String[vertexPartitionMap.size()];
     this.filterDescriptor = filterDescriptor;
 
     int i = 0;
     for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
-      this.sourceVertices[i] = entry.getKey();
+      this.sources[i] = entry.getKey();
       this.numPartitions[i] = entry.getValue();
       i++;
     }
@@ -102,23 +104,23 @@ public class CartesianProductConfig {
   /**
    * create config for partitioned case, with specified source vertices order
    * @param numPartitions
-   * @param sourceVertices
+   * @param sources
    * @param filterDescriptor
    */
   @VisibleForTesting
-  protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+  protected CartesianProductConfig(int[] numPartitions, String[] sources,
                                    CartesianProductFilterDescriptor filterDescriptor) {
     Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
-    Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
-    Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
-      "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
-        "(length: " + sourceVertices.length + ") cannot have different length");
-    Preconditions.checkArgument(sourceVertices.length > 1,
-      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+    Preconditions.checkArgument(sources != null, "source array can't be null");
+    Preconditions.checkArgument(numPartitions.length == sources.length,
+      "partitions count array(length: " + numPartitions.length + ") and source array " +
+        "(length: " + sources.length + ") cannot have different length");
+    Preconditions.checkArgument(sources.length > 1,
+      "there must be more than 1 source " + ", currently only " + sources.length);
 
     this.isPartitioned = true;
     this.numPartitions = numPartitions;
-    this.sourceVertices = sourceVertices;
+    this.sources = sources;
     this.filterDescriptor = filterDescriptor;
 
     checkNumPartitions();
@@ -128,11 +130,11 @@ public class CartesianProductConfig {
    * create config for both cases, used by subclass
    */
   protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
-                                   String[] sourceVertices,
+                                   String[] sources,
                                    CartesianProductFilterDescriptor filterDescriptor) {
     this.isPartitioned = isPartitioned;
     this.numPartitions = numPartitions;
-    this.sourceVertices = sourceVertices;
+    this.sources = sources;
     this.filterDescriptor = filterDescriptor;
   }
 
@@ -142,11 +144,11 @@ public class CartesianProductConfig {
       boolean isUnpartitioned = true;
       for (int i = 0; i < numPartitions.length; i++) {
         Preconditions.checkArgument(this.numPartitions[i] > 0,
-          "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+          "Vertex " + sources[i] + "has negative (" + numPartitions[i] + ") partitions");
         isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
       }
       Preconditions.checkArgument(!isUnpartitioned,
-        "every source vertex has 1 partition in a partitioned case");
+        "every source has 1 partition in a partitioned case");
     } else {
       Preconditions.checkArgument(this.numPartitions == null,
         "partition counts should be null in unpartitioned case");
@@ -154,10 +156,10 @@ public class CartesianProductConfig {
   }
 
   /**
-   * @return the array of source vertices names
+   * @return the array of source vertices (or source vertex group) names
    */
   public List<String> getSourceVertices() {
-    return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+    return Collections.unmodifiableList(Arrays.asList(sources));
   }
 
   /**
@@ -187,7 +189,7 @@ public class CartesianProductConfig {
     CartesianProductConfigProto.Builder builder =
       CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(this.isPartitioned)
-      .addAllSourceVertices(Arrays.asList(sourceVertices));
+      .addAllSources(Arrays.asList(sources));
 
     if (isPartitioned) {
       builder.addAllNumPartitions(Ints.asList(numPartitions));
@@ -220,7 +222,7 @@ public class CartesianProductConfig {
       String desiredBytesPerGroup =
         conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP);
       if (desiredBytesPerGroup != null) {
-        builder.setDesiredBytesPerGroup(Long.parseLong(desiredBytesPerGroup));
+        builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup));
       }
     }
     Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
@@ -246,10 +248,10 @@ public class CartesianProductConfig {
   protected static CartesianProductConfig fromProto(
     CartesianProductConfigProto proto) {
     if (!proto.getIsPartitioned()) {
-      return new CartesianProductConfig(proto.getSourceVerticesList());
+      return new CartesianProductConfig(proto.getSourcesList());
     } else {
-      String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-      proto.getSourceVerticesList().toArray(sourceVertices);
+      String[] sourceVertices = new String[proto.getSourcesList().size()];
+      proto.getSourcesList().toArray(sourceVertices);
       CartesianProductFilterDescriptor filterDescriptor = null;
       if (proto.hasFilterClassName()) {
         filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
index 0347f67..df0bcfa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -27,23 +27,18 @@ import java.nio.ByteBuffer;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
 class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
-  private final int[] numTasks;
-  private final int[] numGroups;
+  final int[] numChunksPerSrc;
+  final int numChunk;
+  final int chunkIdOffset;
 
   protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
-                                            int[] numPartitions, int[] numTasks, int[] numGroups,
-                                            CartesianProductFilterDescriptor filterDescriptor) {
+                                              int[] numPartitions, int[] numChunksPerSrc, int numChunk,
+                                              int chunkIdOffset,
+                                              CartesianProductFilterDescriptor filterDescriptor) {
     super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
-    this.numTasks = numTasks;
-    this.numGroups = numGroups;
-  }
-
-  public int[] getNumTasks() {
-    return this.numTasks;
-  }
-
-  public int[] getNumGroups() {
-    return this.numGroups;
+    this.numChunksPerSrc = numChunksPerSrc;
+    this.numChunk = numChunk;
+    this.chunkIdOffset = chunkIdOffset;
   }
 
   public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
@@ -52,8 +47,8 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
       CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
 
     boolean isPartitioned = proto.getIsPartitioned();
-    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-    proto.getSourceVerticesList().toArray(sourceVertices);
+    String[] sources = new String[proto.getSourcesList().size()];
+    proto.getSourcesList().toArray(sources);
     int[] numPartitions =
       proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
     CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -62,11 +57,11 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
       filterDescriptor.setUserPayload(
         UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
     }
-    int[] numTasks =
-      proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
-    int[] numGroups =
-      proto.getNumGroupsCount() == 0 ? null : Ints.toArray(proto.getNumGroupsList());
-    return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
-      numTasks, numGroups, filterDescriptor);
+    int[] humChunksPerSrc =
+      proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList());
+    int numChunk = proto.getNumChunk();
+    int chunkIdOffset = proto.getChunkIdOffset();
+    return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions,
+      humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
index 068da81..5ece5cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -107,13 +107,13 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager
     CartesianProductCombination combination =
       new CartesianProductCombination(numPartitions);
     combination.firstTask();
-    List<String> sourceVertices = config.getSourceVertices();
+    List<String> sources = config.getSourceVertices();
     do {
-      for (int i = 0; i < sourceVertices.size(); i++) {
-        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      for (int i = 0; i < sources.size(); i++) {
+        vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i));
       }
       if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
-        idealTaskId.add(combination.getTaskId());
+        idealTaskId.add(combination.getChunkId());
       }
     } while (combination.nextTask());
     this.taskIdMapping = Ints.toArray(idealTaskId);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
index 3e1407c..f22035b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -18,7 +18,6 @@
 package org.apache.tez.runtime.library.cartesianproduct;
 
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
index b9cb155..80d7dc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -27,7 +27,9 @@ import javax.annotation.Nullable;
 
 class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
   private int positionId;
-  private int[] numGroups;
+  private int numChunk;
+  private int chunkIdOffset;
+  private int[] numChunkPerSrc;
   private int numDestinationConsumerTasks;
   private Grouper grouper = new Grouper();
 
@@ -36,32 +38,38 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   }
 
   public void initialize(CartesianProductEdgeManagerConfig config) {
-    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
-    this.numGroups = config.getNumGroups();
+    String groupName = getContext().getVertexGroupName();
+    String srcName = groupName != null ? groupName : getContext().getSourceVertexName();
+    this.positionId = config.getSourceVertices().indexOf(srcName);
+    this.numChunkPerSrc = config.numChunksPerSrc;
+    this.numChunk = config.numChunk;
+    this.chunkIdOffset = config.chunkIdOffset;
 
-    if (numGroups != null && numGroups[positionId] != 0) {
-      grouper.init(config.getNumTasks()[positionId], numGroups[positionId]);
+    if (numChunk != 0) {
+      grouper.init(getContext().getSourceVertexNumTasks(), numChunk);
       numDestinationConsumerTasks = 1;
-      for (int numGroup : numGroups) {
+      for (int numGroup : numChunkPerSrc) {
         numDestinationConsumerTasks *= numGroup;
       }
-      numDestinationConsumerTasks /= numGroups[positionId];
+      numDestinationConsumerTasks /= numChunkPerSrc[positionId];
     }
   }
 
   @Override
   public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
     return failedInputId + grouper.getFirstTaskInGroup(
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId));
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset);
   }
 
   @Override
   public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
                                                                 int destTaskId) throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return EventRouteMetadata.create(1, new int[] {idx});
     }
     return null;
@@ -72,10 +80,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
                                                                                   int destTaskId)
     throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return CompositeEventRouteMetadata.create(1, idx, 0);
     }
     return null;
@@ -86,10 +95,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
                                                                          int destTaskId)
     throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return EventRouteMetadata.create(1, new int[] {idx});
     }
     return null;
@@ -97,9 +107,10 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
 
   @Override
   public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    return grouper.getNumTasksInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
index a104904..857f11e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -30,6 +30,8 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +47,13 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
  *
  * Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be
  * determined by vertex manager.
+ *
+ * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast
+ * edge.
+ *
+ * Sources can be either vertices or vertex groups (only in unpartitioned case).
+ *
+ * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case.
  */
 public class CartesianProductVertexManager extends VertexManagerPlugin {
   /**
@@ -95,26 +104,37 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
     // check whether DAG and config are is consistent
     Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
     Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
-    Set<String> sourceVerticesConfig = new HashSet<>();
-    sourceVerticesConfig.addAll(config.getSourceVertices());
+    Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices());
+
+    Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups();
+    Map<String, String> vertexToGroup = new HashMap<>();
+    for (Map.Entry<String, List<String>> group : vertexGroups.entrySet()) {
+      for (String vertex : group.getValue()) {
+        vertexToGroup.put(vertex, group.getKey());
+      }
+    }
 
     for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
       String vertex = entry.getKey();
+      String group = vertexToGroup.get(vertex);
       EdgeProperty edgeProperty = entry.getValue();
       EdgeManagerPluginDescriptor empDescriptor = edgeProperty.getEdgeManagerDescriptor();
       if (empDescriptor != null
         && empDescriptor.getClassName().equals(CartesianProductEdgeManager.class.getName())) {
-        Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
           vertex + " has CartesianProductEdgeManager but isn't in " +
             "CartesianProductVertexManagerConfig");
       } else {
-        Preconditions.checkArgument(!sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          !sourceVerticesConfig.contains(vertex) && !sourceVerticesConfig.contains(group),
           vertex + " has no CartesianProductEdgeManager but is in " +
             "CartesianProductVertexManagerConfig");
       }
 
       if (edgeProperty.getDataMovementType() == CUSTOM) {
-        Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
           "Only broadcast and cartesian product edges are allowed in cartesian product vertex");
       } else {
         Preconditions.checkArgument(edgeProperty.getDataMovementType() == BROADCAST,
@@ -122,14 +142,19 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
       }
     }
 
-    for (String vertex : sourceVerticesConfig) {
-      Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
-        vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
-      Preconditions.checkArgument(
-        edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
-          .equals(CartesianProductEdgeManager.class.getName()),
-        vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
-          "CartesianProductEdgeManager");
+    for (String src : sourceVerticesConfig) {
+      List<String> vertices =
+        vertexGroups.containsKey(src) ? vertexGroups.get(src) : Collections.singletonList(src);
+      for (String v : vertices) {
+        Preconditions.checkArgument(
+          sourceVerticesDAG.contains(v),
+          v + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+        Preconditions.checkArgument(
+          edgePropertyMap.get(v).getEdgeManagerDescriptor().getClassName()
+            .equals(CartesianProductEdgeManager.class.getName()),
+          v + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+            "CartesianProductEdgeManager");
+      }
     }
 
     vertexManagerReal = config.getIsPartitioned()

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
index f43f494..e082ec3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -28,32 +28,24 @@ import java.nio.ByteBuffer;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
 
 class CartesianProductVertexManagerConfig extends CartesianProductConfig {
-  private final float minFraction;
-  private final float maxFraction;
-  private final boolean enableAutoGrouping;
-  private final long desiredBytesPerGroup;
+  final float minFraction;
+  final float maxFraction;
+  final boolean enableAutoGrouping;
+  final long desiredBytesPerChunk;
 
-  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources,
                                              int[] numPartitions,
                                              float minFraction, float maxFraction,
-                                             boolean enableAutoGrouping, long desiredBytesPerGroup,
+                                             boolean enableAutoGrouping, long desiredBytesPerChunk,
                                              CartesianProductFilterDescriptor filterDescriptor) {
-    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    super(isPartitioned, numPartitions, sources, filterDescriptor);
     Preconditions.checkArgument(minFraction <= maxFraction,
       "min fraction(" + minFraction + ") should be less than max fraction(" +
         maxFraction  + ") in cartesian product slow start");
     this.minFraction = minFraction;
     this.maxFraction = maxFraction;
     this.enableAutoGrouping = enableAutoGrouping;
-    this.desiredBytesPerGroup = desiredBytesPerGroup;
-  }
-
-  public float getMinFraction() {
-    return minFraction;
-  }
-
-  public float getMaxFraction() {
-    return maxFraction;
+    this.desiredBytesPerChunk = desiredBytesPerChunk;
   }
 
   public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
@@ -62,8 +54,8 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
       CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
 
     boolean isPartitioned = proto.getIsPartitioned();
-    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-    proto.getSourceVerticesList().toArray(sourceVertices);
+    String[] sources = new String[proto.getSourcesList().size()];
+    proto.getSourcesList().toArray(sources);
     int[] numPartitions =
       proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
     CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -77,17 +69,9 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
 
     boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping()
       : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT;
-    long desiredBytesPerGroup = proto.hasDesiredBytesPerGroup() ? proto.getDesiredBytesPerGroup()
+    long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk()
       : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT;
-    return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+    return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions,
       minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor);
   }
-
-  public boolean isEnableAutoGrouping() {
-    return enableAutoGrouping;
-  }
-
-  public long getDesiredBytesPerGroup() {
-    return desiredBytesPerGroup;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
index 85c04d2..ddff37d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -174,12 +174,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
     // determine the destination task with largest id to schedule
     float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
     int numTaskToSchedule;
-    if (percentFinishedSrcTask < config.getMinFraction()) {
+    if (percentFinishedSrcTask < config.minFraction) {
       numTaskToSchedule = 0;
-    } else if (config.getMinFraction() <= percentFinishedSrcTask &&
-        percentFinishedSrcTask <= config.getMaxFraction()) {
-      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
-        /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+    } else if (config.minFraction <= percentFinishedSrcTask &&
+        percentFinishedSrcTask <= config.maxFraction) {
+      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction)
+        /(config.maxFraction-config.minFraction)*parallelism);
     } else {
       numTaskToSchedule = parallelism;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
index 993cb40..46ea76e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
@@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import com.google.common.primitives.Ints;
 import com.google.protobuf.ByteString;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,31 +48,132 @@ import java.util.Set;
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
+/**
+ * In unpartitioned case, we have one destination task for each source chunk combination. A source
+ * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping)
+ * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across
+ * vertices. The mapping from source chunk to destination task id is done by
+ * {@link <CartesianProductCombination>}.
+ *
+ * If auto grouping is enabled, this vertex manager will estimate output size of each source and
+ * group source tasks of each source in chunk according to desired grouping size configured by user.
+ *
+ *
+ */
 class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
+  /**
+   * a cartesian product source
+   */
+  static class Source {
+    // list of source vertices of this source
+    List<SrcVertex> srcVertices = new ArrayList<>();
+    // position of this source in all sources
+    int position;
+    // name of source vertex or vertex group
+    String name;
+
+    // total number of chunks in this source
+    public int getNumChunk() {
+      int numChunk = 0;
+      for (SrcVertex srcV : srcVertices) {
+        numChunk += srcV.numChunk;
+      }
+      return numChunk;
+    }
+
+    // whether this source has any task completed
+    public boolean hasTaskCompleted() {
+      for (SrcVertex srcV : srcVertices) {
+        if (!srcV.taskCompleted.isEmpty()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Source at position ");
+      sb.append(position);
+      if (name != null) {
+        sb.append(", ");
+        sb.append("vertex group ");
+        sb.append(name);
+
+      }
+      sb.append(": {");
+      for (SrcVertex srcV : srcVertices) {
+        sb.append("[");
+        sb.append(srcV.toString(afterReconfigure));
+        sb.append("], ");
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      sb.setCharAt(sb.length() - 1, '}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * a cartesian product source vertex
+   */
+  class SrcVertex {
+    // which source this vertex belongs to
+    Source source;
+    // vertex name
+    String name;
+    int numTask;
+    // num chunks of this source vertex
+    int numChunk;
+    // offset of chunk id in vertex group
+    // we need sequence chunks in the vertex group to make them look like from single vertex
+    int chunkIdOffset = 0;
+    RoaringBitmap taskCompleted = new RoaringBitmap();
+    RoaringBitmap taskWithVMEvent = new RoaringBitmap();
+    long outputBytes;
+
+    public void doGrouping() {
+      numChunk = numTask;
+      if (config.enableAutoGrouping) {
+        outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality();
+        numChunk = Math.min(numChunk,
+          (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk));
+      }
+    }
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("vertex ").append(name).append(", ");
+      if (afterReconfigure) {
+        sb.append("estimated output ").append(outputBytes).append(" bytes, ");
+        sb.append(numChunk).append(" chunks");
+      } else {
+        sb.append(numTask).append(" tasks, ");
+        sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
+        sb.append("output ").append(outputBytes).append(" bytes");
+      }
+      return sb.toString();
+    }
+  }
+
   private static final Logger LOG =
     org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class);
 
-  List<String> sourceVertices;
-  private int parallelism = 1;
+  CartesianProductVertexManagerConfig config;
+  Map<String, Source> sourcesByName = new HashMap<>();
+  Map<String, SrcVertex> srcVerticesByName = new HashMap<>();
+
   private boolean vertexReconfigured = false;
   private boolean vertexStarted = false;
   private boolean vertexStartSchedule = false;
   private int numCPSrcNotInConfigureState = 0;
   private int numBroadcastSrcNotInRunningState = 0;
-  private int[] numTasks;
-
   private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>();
-  private Map<String, RoaringBitmap> sourceTaskCompleted = new HashMap<>();
   private RoaringBitmap scheduledTasks = new RoaringBitmap();
-  private CartesianProductConfig config;
 
   /* auto reduce related */
-  private int[] numGroups;
+  // num of chunks of source at the corresponding position in source list
+  private int[] numChunksPerSrc;
   private Set<String> vertexSentVME = new HashSet<>();
-  private long[] vertexOutputBytes;
-  private int[] numVertexManagerEventsReceived;
-  private long desiredBytesPerGroup;
-  private boolean enableGrouping;
   private Grouper grouper = new Grouper();
 
   public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
@@ -83,29 +182,39 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
 
   @Override
   public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
-    sourceVertices = config.getSourceVertices();
-    numTasks = new int[sourceVertices.size()];
-    numGroups = new int[sourceVertices.size()];
-    vertexOutputBytes = new long[sourceVertices.size()];
-    numVertexManagerEventsReceived = new int[sourceVertices.size()];
-
-    enableGrouping = config.isEnableAutoGrouping();
-    desiredBytesPerGroup = config.getDesiredBytesPerGroup();
-
-    for (String vertex : sourceVertices) {
-      sourceTaskCompleted.put(vertex, new RoaringBitmap());
-    }
-
-    for (String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
-      if (sourceVertices.indexOf(vertex) != -1) {
-        sourceTaskCompleted.put(vertex, new RoaringBitmap());
-        getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+    for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) {
+      if (e.getValue().getDataMovementType() == CUSTOM
+        && e.getValue().getEdgeManagerDescriptor().getClassName()
+          .equals(CartesianProductEdgeManager.class.getName())) {
+        srcVerticesByName.put(e.getKey(), new SrcVertex());
+        srcVerticesByName.get(e.getKey()).name = e.getKey();
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED));
         numCPSrcNotInConfigureState++;
       } else {
-        getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.RUNNING));
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING));
         numBroadcastSrcNotInRunningState++;
       }
     }
+
+    Map<String, List<String>> srcGroups = getContext().getInputVertexGroups();
+    for (int i = 0; i < config.getSourceVertices().size(); i++) {
+      String srcName = config.getSourceVertices().get(i);
+      Source source = new Source();
+      source.position = i;
+      if (srcGroups.containsKey(srcName)) {
+        source.name = srcName;
+        for (String srcVName : srcGroups.get(srcName)) {
+          source.srcVertices.add(srcVerticesByName.get(srcVName));
+          srcVerticesByName.get(srcVName).source = source;
+        }
+      } else {
+        source.srcVertices.add(srcVerticesByName.get(srcName));
+        srcVerticesByName.get(srcName).source = source;
+      }
+      sourcesByName.put(srcName, source);
+    }
+
+    numChunksPerSrc = new int[sourcesByName.size()];
     this.config = config;
     getContext().vertexReconfigurationPlanned();
   }
@@ -128,7 +237,7 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     VertexState state = stateUpdate.getVertexState();
 
     if (state == VertexState.CONFIGURED) {
-      numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex);
+      srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex);
       numCPSrcNotInConfigureState--;
     } else if (state == VertexState.RUNNING) {
       numBroadcastSrcNotInRunningState--;
@@ -145,22 +254,20 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
   private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) {
     int taskId = attempt.getTaskIdentifier().getIdentifier();
     String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    if (sourceVertices.indexOf(vertex) == -1) {
-      return;
-    }
-    if (sourceTaskCompleted.get(vertex).contains(taskId)) {
-      return;
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    if (srcV != null && !srcV.taskCompleted.contains(taskId)) {
+      srcV.taskCompleted.add(taskId);
+      completedSrcTaskToProcess.add(attempt);
     }
-    sourceTaskCompleted.get(vertex).add(taskId);
-    completedSrcTaskToProcess.add(attempt);
   }
 
   private boolean tryStartSchedule() {
     if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) {
       return false;
     }
-    for (RoaringBitmap bitmap: sourceTaskCompleted.values()) {
-      if (bitmap.isEmpty()) {
+
+    for (Source src : sourcesByName.values()) {
+      if (!src.hasTaskCompleted()) {
         return false;
       }
     }
@@ -178,15 +285,17 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     if (vmEvent.getUserPayload() != null) {
       String srcVertex =
         vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName();
-      int position = sourceVertices.indexOf(srcVertex);
+      SrcVertex srcV = srcVerticesByName.get(srcVertex);
+
       // vmEvent from non-cp vertex doesn't matter
-      if (position == -1) {
+      if (srcV == null) {
         return;
       }
+
       VertexManagerEventPayloadProto proto =
         VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
-      vertexOutputBytes[position] += proto.getOutputSize();
-      numVertexManagerEventsReceived[position]++;
+      srcV.outputBytes += proto.getOutputSize();
+      srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier());
       vertexSentVME.add(srcVertex);
     }
 
@@ -197,60 +306,65 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     if (numCPSrcNotInConfigureState > 0) {
       return false;
     }
-    if (enableGrouping) {
-      if (vertexSentVME.size() != sourceVertices.size()) {
+    if (config.enableAutoGrouping) {
+      if (vertexSentVME.size() != srcVerticesByName.size()) {
         return false;
       }
-      for (int i = 0; i < vertexOutputBytes.length; i++) {
-        if (vertexOutputBytes[i] < desiredBytesPerGroup
-          && numVertexManagerEventsReceived[i] < numTasks[i]) {
+      // every src v must output at least one chunk size
+      for (SrcVertex srcV : srcVerticesByName.values()) {
+        if (srcV.outputBytes < config.desiredBytesPerChunk
+          && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
           return false;
         }
       }
     }
 
-    LOG.info("Start reconfigure, grouping: " + enableGrouping
-      + ", group size: " + desiredBytesPerGroup);
-    LOG.info("src vertices: " + sourceVertices);
-    LOG.info("number of source tasks in each src: " + Arrays.toString(numTasks));
-    LOG.info("number of vmEvent from each src: "
-      + Arrays.toString(numVertexManagerEventsReceived));
-    LOG.info("output stats of each src: " + Arrays.toString(vertexOutputBytes));
-
-    for (int i = 0; i < numTasks.length; i++) {
-      if (enableGrouping) {
-        vertexOutputBytes[i] =
-          vertexOutputBytes[i] * numTasks[i] / numVertexManagerEventsReceived[i];
-        int desiredNumGroup =
-          (int) ((vertexOutputBytes[i] + desiredBytesPerGroup - 1) / desiredBytesPerGroup);
-        numGroups[i] = Math.min(numTasks[i], desiredNumGroup);
-      } else {
-        numGroups[i] = numTasks[i];
+    LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping
+      + ", chunk size: " + config.desiredBytesPerChunk + " bytes.");
+    for (String srcName : config.getSourceVertices()) {
+      LOG.info(sourcesByName.get(srcName).toString(false));
+    }
+
+    for (Source src : sourcesByName.values()) {
+      for (int i = 0; i < src.srcVertices.size(); i++) {
+        src.srcVertices.get(i).doGrouping();
+        if (i > 0) {
+          src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk;
+        }
       }
-      parallelism *= numGroups[i];
+      numChunksPerSrc[src.position] = src.getNumChunk();
     }
 
-    LOG.info("estimated output size of each src: " + Arrays.toString(vertexOutputBytes));
-    LOG.info("number of groups for each src: " + Arrays.toString(numGroups));
+    int parallelism = 1;
+    for (Source src : sourcesByName.values()) {
+      parallelism *= src.getNumChunk();
+    }
+
+    LOG.info("After reconfigure, ");
+    for (String srcName : config.getSourceVertices()) {
+      LOG.info(sourcesByName.get(srcName).toString(true));
+    }
     LOG.info("Final parallelism: " + parallelism);
 
-    UserPayload payload = null;
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    for (int i = 0; i < numChunksPerSrc.length; i++) {
+      numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk();
+    }
+    builder.setIsPartitioned(false).addAllSources(config.getSourceVertices())
+      .addAllNumChunks(Ints.asList(this.numChunksPerSrc));
+
     Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
     Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator();
     while (iter.hasNext()) {
-      EdgeProperty edgeProperty = iter.next().getValue();
-      if (edgeProperty.getDataMovementType() != CUSTOM) {
+      Map.Entry<String, EdgeProperty> e = iter.next();
+      if (e.getValue().getDataMovementType() != CUSTOM) {
         iter.remove();
-        continue;
-      }
-      EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor();
-      if (payload == null) {
-        CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-        builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
-          .addAllNumGroups(Ints.asList(numGroups)).addAllSourceVertices(config.getSourceVertices());
-        payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+      } else {
+        SrcVertex srcV = srcVerticesByName.get(e.getKey());
+        builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset);
+        e.getValue().getEdgeManagerDescriptor()
+          .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
       }
-      descriptor.setUserPayload(payload);
     }
     getContext().reconfigureVertex(parallelism, null, edgeProperties);
     vertexReconfigured = true;
@@ -267,32 +381,42 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     }
 
     while (!completedSrcTaskToProcess.isEmpty()) {
-      scheduledTasksDependOnCompletion(completedSrcTaskToProcess.poll());
+      scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll());
     }
   }
 
-  private void scheduledTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
+  private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
     int taskId = attempt.getTaskIdentifier().getIdentifier();
     String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    int position = sourceVertices.indexOf(vertex);
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    Source src = srcV.source;
 
     List<ScheduleTaskRequest> requests = new ArrayList<>();
     CartesianProductCombination combination =
-      new CartesianProductCombination(numGroups, position);
-    grouper.init(numTasks[position], numGroups[position]);
-    combination.firstTaskWithFixedPartition(grouper.getGroupId(taskId));
+      new CartesianProductCombination(numChunksPerSrc, src.position);
+    grouper.init(srcV.numTask, srcV.numChunk);
+    combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset);
     do {
       List<Integer> list = combination.getCombination();
 
-      if (scheduledTasks.contains(combination.getTaskId())) {
+      if (scheduledTasks.contains(combination.getChunkId())) {
         continue;
       }
       boolean readyToSchedule = true;
       for (int i = 0; i < list.size(); i++) {
-        int group = list.get(i);
-        grouper.init(numTasks[i], numGroups[i]);
-        for (int j = grouper.getFirstTaskInGroup(group); j <= grouper.getLastTaskInGroup(group); j++) {
-          if (!sourceTaskCompleted.get(sourceVertices.get(i)).contains(j)) {
+        int chunkId = list.get(i);
+        SrcVertex srcVHasGroup = null;
+        for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) {
+          if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) {
+            srcVHasGroup = v;
+            break;
+          }
+        }
+        assert srcVHasGroup != null;
+        grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk);
+        chunkId -= srcVHasGroup.chunkIdOffset;
+        for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) {
+          if (!srcVHasGroup.taskCompleted.contains(j)) {
             readyToSchedule = false;
             break;
           }
@@ -303,10 +427,10 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
       }
 
       if (readyToSchedule) {
-        requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
-        scheduledTasks.add(combination.getTaskId());
+        requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null));
+        scheduledTasks.add(combination.getChunkId());
       }
-    } while (combination.nextTaskWithFixedPartition());
+    } while (combination.nextTaskWithFixedChunk());
     if (!requests.isEmpty()) {
       getContext().scheduleTasks(requests);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
index dd7d06f..cb503ea 100644
--- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -21,14 +21,15 @@ option java_outer_classname = "CartesianProductUserPayload";
 
 message CartesianProductConfigProto {
     required bool isPartitioned = 1;
-    repeated string sourceVertices = 2;
+    repeated string sources = 2;
     repeated int32 numPartitions = 3;
     optional string filterClassName = 4;
     optional bytes filterUserPayload = 5;
     optional float minFraction = 6;
     optional float maxFraction = 7;
     optional bool enableAutoGrouping = 8;
-    optional int64 desiredBytesPerGroup = 9;
-    repeated int32 numTasks = 10;
-    repeated int32 numGroups = 11;
+    optional int64 desiredBytesPerChunk = 9;
+    repeated int32 numChunks = 10;
+    optional int32 numChunk = 11;
+    optional int32 chunkIdOffset = 12;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 9a5614e..439d650 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -331,6 +331,11 @@ public class TestShuffleVertexManagerUtils {
           public int getDestinationVertexNumTasks() {
             return numTasks;
           }
+
+          @Override
+          public String getVertexGroupName() {
+            return null;
+          }
         };
         if (newEdgeManagers != null) {
           EdgeManagerPlugin edgeManager = ReflectionUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
index 06d3e90..3755ac8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -30,44 +30,44 @@ import static org.junit.Assert.assertTrue;
 public class TestCartesianProductCombination {
   private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
     assertArrayEquals(result, Ints.toArray(combination.getCombination()));
-    assertEquals(taskId, combination.getTaskId());
+    assertEquals(taskId, combination.getChunkId());
   }
 
   private void testCombinationTwoWayVertex0() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{1,0}, 3);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1}, 4);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,2}, 5);
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   private void testCombinationTwoWayVertex1() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{0,1}, 1);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1}, 4);
 
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   private void testCombinationThreeWay() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{0,1,0}, 2);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{0,1,1}, 3);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1,0}, 6);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1,1}, 7);
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   @Test(timeout = 5000)
@@ -110,9 +110,9 @@ public class TestCartesianProductCombination {
 
   @Test(timeout = 5000)
   public void testRejectZero() {
-    int[] numTasks = new int[] {0 ,1};
+    int[] numChunk = new int[] {0 ,1};
     try {
-      new CartesianProductCombination(numTasks);
+      new CartesianProductCombination(numChunk);
       assertTrue(false);
     } catch (Exception ignored) {}
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
index c9e49a3..4857749 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -122,15 +122,15 @@ public class TestCartesianProductConfig {
     // auto grouping conf not set
     CartesianProductConfigProto proto = config.toProto(conf);
     assertFalse(proto.hasEnableAutoGrouping());
-    assertFalse(proto.hasDesiredBytesPerGroup());
+    assertFalse(proto.hasDesiredBytesPerChunk());
 
     // auto groupinig conf not set
     conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
     conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
     proto = config.toProto(conf);
     assertTrue(proto.hasEnableAutoGrouping());
-    assertTrue(proto.hasDesiredBytesPerGroup());
+    assertTrue(proto.hasDesiredBytesPerChunk());
     assertEquals(true, proto.getEnableAutoGrouping());
-    assertEquals(1000, proto.getDesiredBytesPerGroup());
+    assertEquals(1000, proto.getDesiredBytesPerChunk());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
index 12aee3b..d722932 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -40,7 +40,7 @@ public class TestCartesianProductEdgeManager {
     // partitioned case
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(true)
-      .addAllSourceVertices(Arrays.asList("v0", "v1"))
+      .addAllSources(Arrays.asList("v0", "v1"))
       .addAllNumPartitions(Ints.asList(2,3));
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
@@ -51,8 +51,8 @@ public class TestCartesianProductEdgeManager {
     // unpartitioned case
     builder.clear();
     builder.setIsPartitioned(false)
-      .addAllSourceVertices(Arrays.asList("v0", "v1"))
-      .addAllNumTasks(Ints.asList(2,3));
+      .addAllSources(Arrays.asList("v0", "v1"))
+      .addAllNumChunks(Ints.asList(2,3));
     payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
     when(context.getSourceVertexNumTasks()).thenReturn(2);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
index 9f6fa09..3ba6aad 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
@@ -28,23 +28,26 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 public class TestCartesianProductEdgeManagerConfig {
   @Test(timeout = 5000)
-  public void testAutoGroupingConfig() throws IOException {
+  public void testUnpartitionedAutoGroupingConfig() throws IOException {
     List<String> sourceVertices = new ArrayList<>();
     sourceVertices.add("v0");
     sourceVertices.add("v1");
-    int[] numTasks = new int[] {4, 5};
-    int[] numGroups = new int[] {2, 3};
+    int[] numChunkPerSrc = new int[] {2, 3};
+    int numGroup = 3, chunkIdOffset = 0;
 
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-    builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
-      .addAllSourceVertices(sourceVertices).addAllNumGroups(Ints.asList(numGroups));
+    builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc))
+      .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset);
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
 
     CartesianProductEdgeManagerConfig config =
       CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-    assertArrayEquals(numGroups, config.getNumGroups());
+    assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc);
+    assertEquals(numGroup, config.numChunk);
+    assertEquals(chunkIdOffset, config.chunkIdOffset);
   }
 }