You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/04/10 18:20:15 UTC

[2/2] tez git commit: TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)

TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f355a050
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f355a050
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f355a050

Branch: refs/heads/master
Commit: f355a050cf012854d1f7145d05f1981a3cf97e67
Parents: e0ee28a
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Apr 10 11:18:33 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Apr 10 11:18:33 2017 -0700

----------------------------------------------------------------------
 .../tez/dag/api/EdgeManagerPluginContext.java   |   4 +
 .../tez/dag/api/VertexManagerPluginContext.java |   9 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  12 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   4 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |  14 +
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |  32 ++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   6 +-
 .../tez/dag/app/dag/impl/TestVertexManager.java |  26 +
 .../CartesianProductCombination.java            |  78 ++-
 .../CartesianProductConfig.java                 |  60 +-
 .../CartesianProductEdgeManagerConfig.java      |  39 +-
 .../CartesianProductEdgeManagerPartitioned.java |   8 +-
 .../CartesianProductEdgeManagerReal.java        |   1 -
 ...artesianProductEdgeManagerUnpartitioned.java |  57 +-
 .../CartesianProductVertexManager.java          |  51 +-
 .../CartesianProductVertexManagerConfig.java    |  40 +-
 ...artesianProductVertexManagerPartitioned.java |  10 +-
 ...tesianProductVertexManagerUnpartitioned.java | 316 +++++++----
 .../main/proto/CartesianProductPayload.proto    |   9 +-
 .../TestShuffleVertexManagerUtils.java          |   5 +
 .../TestCartesianProductCombination.java        |  30 +-
 .../TestCartesianProductConfig.java             |   6 +-
 .../TestCartesianProductEdgeManager.java        |   6 +-
 .../TestCartesianProductEdgeManagerConfig.java  |  15 +-
 ...tCartesianProductEdgeManagerPartitioned.java |  13 +-
 ...artesianProductEdgeManagerUnpartitioned.java | 408 +++++++------
 .../TestCartesianProductVertexManager.java      |  19 +
 ...TestCartesianProductVertexManagerConfig.java |   8 +-
 ...tesianProductVertexManagerUnpartitioned.java | 565 +++++++++++--------
 30 files changed, 1099 insertions(+), 754 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
index 79f685d..ef6925b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
@@ -58,4 +58,8 @@ public interface EdgeManagerPluginContext {
    */
   public int getDestinationVertexNumTasks();
 
+  /**
+   * @return the name of vertex group that source vertex belongs to, or null
+   */
+  String getVertexGroupName();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index aa99745..b858a65 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -350,4 +350,13 @@ public interface VertexManagerPluginContext {
   // TODO must be done later after TEZ-1714
   //public void vertexManagerDone();
 
+  /**
+   * Get input vertex groups of this vertex, including vertex group name and
+   * all members vertex name
+   *
+   * @return map whose key is vertex group name and value is list of members' name,
+   *         or empty map if there is no input vertex group.
+   */
+  Map<String, List<String>> getInputVertexGroups();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 1b3b39c..51847d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -149,7 +149,7 @@ public interface Vertex extends Comparable<Vertex> {
   List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException;
   List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException;
   
-  List<GroupInputSpec> getGroupInputSpecList(int taskIndex);
+  List<GroupInputSpec> getGroupInputSpecList();
   void addSharedOutputs(Set<String> outputs);
   Set<String> getSharedOutputs();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 690df63..f78c9a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -28,6 +28,7 @@ import java.util.zip.Deflater;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,17 @@ public class Edge {
       return destinationVertex.getTotalTasks();
     }
 
+    @Override
+    public String getVertexGroupName() {
+      if (destinationVertex.getGroupInputSpecList() != null) {
+        for (GroupInputSpec group : destinationVertex.getGroupInputSpecList()) {
+          if (group.getGroupVertices().contains(getSourceVertexName())) {
+            return group.getGroupName();
+          }
+        }
+      }
+      return null;
+    }
   }
 
   private EdgeProperty edgeProperty;

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1ab3da8..ab17fe4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1581,7 +1581,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return TaskSpec.createBaseTaskSpec(getDAG().getName(),
         getName(), getTotalTasks(), getProcessorDescriptor(),
         getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
-        getGroupInputSpecList(taskIndex), vertexOnlyConf);
+        getGroupInputSpecList(), vertexOnlyConf);
   }
 
   @Override
@@ -4422,7 +4422,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
 
   @Override
-  public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+  public List<GroupInputSpec> getGroupInputSpecList() {
     readLock.lock();
     try {
       return groupInputSpecList;

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 45f72bd..b7d3428 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,6 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -360,6 +362,18 @@ public class VertexManager {
     }
 
     @Override
+    public Map<String, List<String>> getInputVertexGroups() {
+      checkAndThrowIfDone();
+      Map<String, List<String>> inputGroups = Maps.newHashMap();
+      if (managedVertex.getGroupInputSpecList() != null) {
+        for (GroupInputSpec group : managedVertex.getGroupInputSpecList()) {
+          inputGroups.put(group.getGroupName(), Collections.unmodifiableList(group.getGroupVertices()));
+        }
+      }
+      return inputGroups;
+    }
+
+    @Override
     public void onStateUpdated(VertexStateUpdate event) {
       // this is not called by the vertex manager plugin. 
       // no need to synchronize this. similar to other external notification methods

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index f53e505..1143395 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -34,6 +35,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -67,7 +69,9 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.test.EdgeManagerForTest;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -476,4 +480,32 @@ public class TestEdge {
       return emConf.sourceTaskIndex;
     }
   }
+
+  @Test(timeout = 5000)
+  public void testEdgeManagerPluginCtxGetVertexGroupName() throws TezException {
+    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+      EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+    EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor,
+      DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"),
+      InputDescriptor.create("In"));
+    Edge edge = new Edge(edgeProp, null, null);
+
+    Vertex srcV = mock(Vertex.class), destV = mock(Vertex.class);
+    String srcName = "srcV", destName = "destV";
+    when(srcV.getName()).thenReturn(srcName);
+    when(destV.getName()).thenReturn(destName);
+    edge.setSourceVertex(srcV);
+    edge.setDestinationVertex(destV);
+
+    assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+    String group = "group";
+    when(destV.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList("v1", "v3"), null)));
+    assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+    when(destV.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(srcName, "v3"), null)));
+    assertEquals(group, edge.edgeManager.getContext().getVertexGroupName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 90d675e..bc06fd0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5940,10 +5940,10 @@ public class TestVertexImpl {
         VertexEventType.V_INIT));
     dispatcher.await();
     
-    Assert.assertNull(vA.getGroupInputSpecList(0));
-    Assert.assertNull(vB.getGroupInputSpecList(0));
+    Assert.assertNull(vA.getGroupInputSpecList());
+    Assert.assertNull(vB.getGroupInputSpecList());
     
-    List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
+    List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList();
     Assert.assertEquals(1, groupInSpec.size());
     Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
     assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index b93b298..6bec26e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
@@ -30,6 +32,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -55,6 +59,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -205,6 +210,27 @@ public class TestVertexManager {
     assertEquals(Sets.newHashSet("input1","input2"), edgeVertexSet);
   }
 
+  @Test(timeout = 5000)
+  public void testVMPluginCtxGetInputVertexGroup() throws Exception {
+    VertexManager vm =
+      new VertexManager(
+        VertexManagerPluginDescriptor.create(CustomVertexManager.class
+          .getName()), UserGroupInformation.getCurrentUser(),
+        mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+
+    assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty());
+
+    String group = "group", v1 = "v1", v2 = "v2";
+    when(mockVertex.getGroupInputSpecList())
+      .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(v1, v2), null)));
+    Map<String, List<String>> groups = vm.pluginContext.getInputVertexGroups();
+    assertEquals(1, groups.size());
+    assertTrue(groups.containsKey(group));
+    assertEquals(2, groups.get(group).size());
+    assertTrue(groups.get(group).contains(v1));
+    assertTrue(groups.get(group).contains(v2));
+  }
+
   public static class CustomVertexManager extends VertexManagerPlugin {
 
     private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
index c6c95f2..97f3eb2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -25,49 +25,47 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Represent the combination of source partitions or tasks.
+ * Represent the combination of source chunks. A chunk is one or more source tasks or partitions.
  *
- * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
- * destination tasks. The mapping from source partition/task to destination task is like this:
+ * For example, if we have two source vertices and each generates two chunks, we will have 2*2=4
+ * destination tasks. The mapping from source chunks to destination task is like this:
  * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
  *
- * Basically, it stores the source partition/task combination and can compute corresponding
+ * Basically, it stores the source chunk id combination and can compute corresponding
  * destination task. It can also figure out the source combination from a given destination task.
- * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
- * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ * Task id is mapped in the ascending order of combinations, starting from 0.
  *
  * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
  * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
  *
- * Or you can also traverse all combinations that has one specific partition with
- * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * Or you can also traverse all combinations that has one specific chunk with
+ * <method>firstTaskWithFixedChunk</method> and <method>nextTaskWithFixedChunk</method>,
  * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
- * partition.
+ * chunk.
  */
 class CartesianProductCombination {
-  // numPartitions for partitioned case, numTasks for unpartitioned case
-  private int[] numPartitionOrTask;
-  // at which position (in source vertices array) our vertex is
+  private int[] numChunk;
+  // which position (in source vertices array) we care about
   private int positionId = -1;
-  // The i-th element Ci represents partition/task Ci of source vertex i.
+  // The i-th element Ci represents chunk Ci of source vertex i.
   private final Integer[] combination;
-  // the weight of each vertex when computing the task id
+  // helper array to computer task id: task id = (combination) dot-product (factor)
   private final Integer[] factor;
 
-  public CartesianProductCombination(int[] numPartitionOrTask) {
-    Preconditions.checkArgument(!Ints.contains(numPartitionOrTask, 0),
-      "CartesianProductCombination doesn't allow zero partition or task");
-    this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
-    combination = new Integer[numPartitionOrTask.length];
-    factor = new Integer[numPartitionOrTask.length];
+  public CartesianProductCombination(int[] numChunk) {
+    Preconditions.checkArgument(!Ints.contains(numChunk, 0),
+      "CartesianProductCombination doesn't allow zero chunk");
+    this.numChunk = Arrays.copyOf(numChunk, numChunk.length);
+    combination = new Integer[numChunk.length];
+    factor = new Integer[numChunk.length];
     factor[factor.length-1] = 1;
     for (int i = combination.length-2; i >= 0; i--) {
-      factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+      factor[i] = factor[i+1]* numChunk[i+1];
     }
   }
 
-  public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
-    this(numPartitionOrTask);
+  public CartesianProductCombination(int[] numChunk, int positionId) {
+    this(numChunk);
     this.positionId = positionId;
   }
 
@@ -79,24 +77,24 @@ class CartesianProductCombination {
   }
 
   /**
-   * first combination with given partition id in current position
-   * @param partition
+   * first combination with given chunk id in current position
+   * @param chunkId
    */
-  public void firstTaskWithFixedPartition(int partition) {
+  public void firstTaskWithFixedChunk(int chunkId) {
     Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
     Arrays.fill(combination, 0);
-    combination[positionId] = partition;
+    combination[positionId] = chunkId;
   }
 
   /**
-   * next combination without current partition in current position
+   * next combination without current chunk in current position
    * @return false if there is no next combination
    */
-  public boolean nextTaskWithFixedPartition() {
+  public boolean nextTaskWithFixedChunk() {
     Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
     int i;
     for (i = combination.length-1; i >= 0; i--) {
-      if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+      if (i != positionId && combination[i] != numChunk[i]-1) {
         break;
       }
     }
@@ -117,20 +115,20 @@ class CartesianProductCombination {
   }
 
   /**
-   * first combination with given partition id in current position
+   * first combination with given chunk id in current position
    */
   public void firstTask() {
     Arrays.fill(combination, 0);
   }
 
   /**
-   * next combination without current partition in current position
+   * next combination without current chunk in current position
    * @return false if there is no next combination
    */
   public boolean nextTask() {
     int i;
     for (i = combination.length-1; i >= 0; i--) {
-      if (combination[i] != numPartitionOrTask[i]-1) {
+      if (combination[i] != numChunk[i]-1) {
         break;
       }
     }
@@ -145,19 +143,19 @@ class CartesianProductCombination {
   }
 
   /**
-   * @return corresponding task id for current combination
+   * @return corresponding chunk id for current combination
    */
-  public int getTaskId() {
-    int taskId = 0;
+  public int getChunkId() {
+    int chunkId = 0;
     for (int i = 0; i < combination.length; i++) {
-      taskId += combination[i]*factor[i];
+      chunkId += combination[i]*factor[i];
     }
-    return taskId;
+    return chunkId;
   }
 
-  public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+  public static CartesianProductCombination fromTaskId(int[] numChunk,
                                                        int taskId) {
-    CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+    CartesianProductCombination result = new CartesianProductCombination(numChunk);
     for (int i = 0; i < result.combination.length; i++) {
       result.combination[i] = taskId/result.factor[i];
       taskId %= result.factor[i];

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
index b57ed84..12a17cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -46,21 +46,23 @@ import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUs
 @Evolving
 public class CartesianProductConfig {
   private final boolean isPartitioned;
-  private final String[] sourceVertices;
+  private final String[] sources;
+ // numPartition[i] means how many partitions sourceVertices[i] will generate
+ // (not used in unpartitioned case)
   private final int[] numPartitions;
   private final CartesianProductFilterDescriptor filterDescriptor;
 
   /**
    * create config for unpartitioned case
-   * @param sourceVertices list of source vertices names
+   * @param sources list of names of source vertices or vertex groups
    */
-  public CartesianProductConfig(List<String> sourceVertices) {
-    Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
-    Preconditions.checkArgument(sourceVertices.size() > 1,
-      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+  public CartesianProductConfig(List<String> sources) {
+    Preconditions.checkArgument(sources != null, "source list cannot be null");
+    Preconditions.checkArgument(sources.size() > 1,
+      "there must be more than 1 source " + "67, currently only " + sources.size());
 
     this.isPartitioned = false;
-    this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+    this.sources = sources.toArray(new String[sources.size()]);
     this.numPartitions = null;
     this.filterDescriptor = null;
   }
@@ -86,12 +88,12 @@ public class CartesianProductConfig {
 
     this.isPartitioned = true;
     this.numPartitions = new int[vertexPartitionMap.size()];
-    this.sourceVertices = new String[vertexPartitionMap.size()];
+    this.sources = new String[vertexPartitionMap.size()];
     this.filterDescriptor = filterDescriptor;
 
     int i = 0;
     for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
-      this.sourceVertices[i] = entry.getKey();
+      this.sources[i] = entry.getKey();
       this.numPartitions[i] = entry.getValue();
       i++;
     }
@@ -102,23 +104,23 @@ public class CartesianProductConfig {
   /**
    * create config for partitioned case, with specified source vertices order
    * @param numPartitions
-   * @param sourceVertices
+   * @param sources
    * @param filterDescriptor
    */
   @VisibleForTesting
-  protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+  protected CartesianProductConfig(int[] numPartitions, String[] sources,
                                    CartesianProductFilterDescriptor filterDescriptor) {
     Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
-    Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
-    Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
-      "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
-        "(length: " + sourceVertices.length + ") cannot have different length");
-    Preconditions.checkArgument(sourceVertices.length > 1,
-      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+    Preconditions.checkArgument(sources != null, "source array can't be null");
+    Preconditions.checkArgument(numPartitions.length == sources.length,
+      "partitions count array(length: " + numPartitions.length + ") and source array " +
+        "(length: " + sources.length + ") cannot have different length");
+    Preconditions.checkArgument(sources.length > 1,
+      "there must be more than 1 source " + ", currently only " + sources.length);
 
     this.isPartitioned = true;
     this.numPartitions = numPartitions;
-    this.sourceVertices = sourceVertices;
+    this.sources = sources;
     this.filterDescriptor = filterDescriptor;
 
     checkNumPartitions();
@@ -128,11 +130,11 @@ public class CartesianProductConfig {
    * create config for both cases, used by subclass
    */
   protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
-                                   String[] sourceVertices,
+                                   String[] sources,
                                    CartesianProductFilterDescriptor filterDescriptor) {
     this.isPartitioned = isPartitioned;
     this.numPartitions = numPartitions;
-    this.sourceVertices = sourceVertices;
+    this.sources = sources;
     this.filterDescriptor = filterDescriptor;
   }
 
@@ -142,11 +144,11 @@ public class CartesianProductConfig {
       boolean isUnpartitioned = true;
       for (int i = 0; i < numPartitions.length; i++) {
         Preconditions.checkArgument(this.numPartitions[i] > 0,
-          "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+          "Vertex " + sources[i] + "has negative (" + numPartitions[i] + ") partitions");
         isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
       }
       Preconditions.checkArgument(!isUnpartitioned,
-        "every source vertex has 1 partition in a partitioned case");
+        "every source has 1 partition in a partitioned case");
     } else {
       Preconditions.checkArgument(this.numPartitions == null,
         "partition counts should be null in unpartitioned case");
@@ -154,10 +156,10 @@ public class CartesianProductConfig {
   }
 
   /**
-   * @return the array of source vertices names
+   * @return the array of source vertices (or source vertex group) names
    */
   public List<String> getSourceVertices() {
-    return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+    return Collections.unmodifiableList(Arrays.asList(sources));
   }
 
   /**
@@ -187,7 +189,7 @@ public class CartesianProductConfig {
     CartesianProductConfigProto.Builder builder =
       CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(this.isPartitioned)
-      .addAllSourceVertices(Arrays.asList(sourceVertices));
+      .addAllSources(Arrays.asList(sources));
 
     if (isPartitioned) {
       builder.addAllNumPartitions(Ints.asList(numPartitions));
@@ -220,7 +222,7 @@ public class CartesianProductConfig {
       String desiredBytesPerGroup =
         conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP);
       if (desiredBytesPerGroup != null) {
-        builder.setDesiredBytesPerGroup(Long.parseLong(desiredBytesPerGroup));
+        builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup));
       }
     }
     Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
@@ -246,10 +248,10 @@ public class CartesianProductConfig {
   protected static CartesianProductConfig fromProto(
     CartesianProductConfigProto proto) {
     if (!proto.getIsPartitioned()) {
-      return new CartesianProductConfig(proto.getSourceVerticesList());
+      return new CartesianProductConfig(proto.getSourcesList());
     } else {
-      String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-      proto.getSourceVerticesList().toArray(sourceVertices);
+      String[] sourceVertices = new String[proto.getSourcesList().size()];
+      proto.getSourcesList().toArray(sourceVertices);
       CartesianProductFilterDescriptor filterDescriptor = null;
       if (proto.hasFilterClassName()) {
         filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
index 0347f67..df0bcfa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -27,23 +27,18 @@ import java.nio.ByteBuffer;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
 class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
-  private final int[] numTasks;
-  private final int[] numGroups;
+  final int[] numChunksPerSrc;
+  final int numChunk;
+  final int chunkIdOffset;
 
   protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
-                                            int[] numPartitions, int[] numTasks, int[] numGroups,
-                                            CartesianProductFilterDescriptor filterDescriptor) {
+                                              int[] numPartitions, int[] numChunksPerSrc, int numChunk,
+                                              int chunkIdOffset,
+                                              CartesianProductFilterDescriptor filterDescriptor) {
     super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
-    this.numTasks = numTasks;
-    this.numGroups = numGroups;
-  }
-
-  public int[] getNumTasks() {
-    return this.numTasks;
-  }
-
-  public int[] getNumGroups() {
-    return this.numGroups;
+    this.numChunksPerSrc = numChunksPerSrc;
+    this.numChunk = numChunk;
+    this.chunkIdOffset = chunkIdOffset;
   }
 
   public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
@@ -52,8 +47,8 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
       CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
 
     boolean isPartitioned = proto.getIsPartitioned();
-    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-    proto.getSourceVerticesList().toArray(sourceVertices);
+    String[] sources = new String[proto.getSourcesList().size()];
+    proto.getSourcesList().toArray(sources);
     int[] numPartitions =
       proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
     CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -62,11 +57,11 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
       filterDescriptor.setUserPayload(
         UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
     }
-    int[] numTasks =
-      proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
-    int[] numGroups =
-      proto.getNumGroupsCount() == 0 ? null : Ints.toArray(proto.getNumGroupsList());
-    return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
-      numTasks, numGroups, filterDescriptor);
+    int[] humChunksPerSrc =
+      proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList());
+    int numChunk = proto.getNumChunk();
+    int chunkIdOffset = proto.getChunkIdOffset();
+    return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions,
+      humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
index 068da81..5ece5cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -107,13 +107,13 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager
     CartesianProductCombination combination =
       new CartesianProductCombination(numPartitions);
     combination.firstTask();
-    List<String> sourceVertices = config.getSourceVertices();
+    List<String> sources = config.getSourceVertices();
     do {
-      for (int i = 0; i < sourceVertices.size(); i++) {
-        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      for (int i = 0; i < sources.size(); i++) {
+        vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i));
       }
       if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
-        idealTaskId.add(combination.getTaskId());
+        idealTaskId.add(combination.getChunkId());
       }
     } while (combination.nextTask());
     this.taskIdMapping = Ints.toArray(idealTaskId);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
index 3e1407c..f22035b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -18,7 +18,6 @@
 package org.apache.tez.runtime.library.cartesianproduct;
 
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
index b9cb155..80d7dc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -27,7 +27,9 @@ import javax.annotation.Nullable;
 
 class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
   private int positionId;
-  private int[] numGroups;
+  private int numChunk;
+  private int chunkIdOffset;
+  private int[] numChunkPerSrc;
   private int numDestinationConsumerTasks;
   private Grouper grouper = new Grouper();
 
@@ -36,32 +38,38 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   }
 
   public void initialize(CartesianProductEdgeManagerConfig config) {
-    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
-    this.numGroups = config.getNumGroups();
+    String groupName = getContext().getVertexGroupName();
+    String srcName = groupName != null ? groupName : getContext().getSourceVertexName();
+    this.positionId = config.getSourceVertices().indexOf(srcName);
+    this.numChunkPerSrc = config.numChunksPerSrc;
+    this.numChunk = config.numChunk;
+    this.chunkIdOffset = config.chunkIdOffset;
 
-    if (numGroups != null && numGroups[positionId] != 0) {
-      grouper.init(config.getNumTasks()[positionId], numGroups[positionId]);
+    if (numChunk != 0) {
+      grouper.init(getContext().getSourceVertexNumTasks(), numChunk);
       numDestinationConsumerTasks = 1;
-      for (int numGroup : numGroups) {
+      for (int numGroup : numChunkPerSrc) {
         numDestinationConsumerTasks *= numGroup;
       }
-      numDestinationConsumerTasks /= numGroups[positionId];
+      numDestinationConsumerTasks /= numChunkPerSrc[positionId];
     }
   }
 
   @Override
   public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
     return failedInputId + grouper.getFirstTaskInGroup(
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId));
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset);
   }
 
   @Override
   public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
                                                                 int destTaskId) throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return EventRouteMetadata.create(1, new int[] {idx});
     }
     return null;
@@ -72,10 +80,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
                                                                                   int destTaskId)
     throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return CompositeEventRouteMetadata.create(1, idx, 0);
     }
     return null;
@@ -86,10 +95,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
   public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
                                                                          int destTaskId)
     throws Exception {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    if (grouper.isInGroup(srcTaskId, groupId)) {
-      int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+      int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
       return EventRouteMetadata.create(1, new int[] {idx});
     }
     return null;
@@ -97,9 +107,10 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
 
   @Override
   public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
-    int groupId =
-      CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
-    return grouper.getNumTasksInGroup(groupId);
+    int chunkId =
+      CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+        - chunkIdOffset;
+    return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
index a104904..857f11e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -30,6 +30,8 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +47,13 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
  *
  * Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be
  * determined by vertex manager.
+ *
+ * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast
+ * edge.
+ *
+ * Sources can be either vertices or vertex groups (only in unpartitioned case).
+ *
+ * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case.
  */
 public class CartesianProductVertexManager extends VertexManagerPlugin {
   /**
@@ -95,26 +104,37 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
     // check whether DAG and config are is consistent
     Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
     Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
-    Set<String> sourceVerticesConfig = new HashSet<>();
-    sourceVerticesConfig.addAll(config.getSourceVertices());
+    Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices());
+
+    Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups();
+    Map<String, String> vertexToGroup = new HashMap<>();
+    for (Map.Entry<String, List<String>> group : vertexGroups.entrySet()) {
+      for (String vertex : group.getValue()) {
+        vertexToGroup.put(vertex, group.getKey());
+      }
+    }
 
     for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
       String vertex = entry.getKey();
+      String group = vertexToGroup.get(vertex);
       EdgeProperty edgeProperty = entry.getValue();
       EdgeManagerPluginDescriptor empDescriptor = edgeProperty.getEdgeManagerDescriptor();
       if (empDescriptor != null
         && empDescriptor.getClassName().equals(CartesianProductEdgeManager.class.getName())) {
-        Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
           vertex + " has CartesianProductEdgeManager but isn't in " +
             "CartesianProductVertexManagerConfig");
       } else {
-        Preconditions.checkArgument(!sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          !sourceVerticesConfig.contains(vertex) && !sourceVerticesConfig.contains(group),
           vertex + " has no CartesianProductEdgeManager but is in " +
             "CartesianProductVertexManagerConfig");
       }
 
       if (edgeProperty.getDataMovementType() == CUSTOM) {
-        Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+        Preconditions.checkArgument(
+          sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
           "Only broadcast and cartesian product edges are allowed in cartesian product vertex");
       } else {
         Preconditions.checkArgument(edgeProperty.getDataMovementType() == BROADCAST,
@@ -122,14 +142,19 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
       }
     }
 
-    for (String vertex : sourceVerticesConfig) {
-      Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
-        vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
-      Preconditions.checkArgument(
-        edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
-          .equals(CartesianProductEdgeManager.class.getName()),
-        vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
-          "CartesianProductEdgeManager");
+    for (String src : sourceVerticesConfig) {
+      List<String> vertices =
+        vertexGroups.containsKey(src) ? vertexGroups.get(src) : Collections.singletonList(src);
+      for (String v : vertices) {
+        Preconditions.checkArgument(
+          sourceVerticesDAG.contains(v),
+          v + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+        Preconditions.checkArgument(
+          edgePropertyMap.get(v).getEdgeManagerDescriptor().getClassName()
+            .equals(CartesianProductEdgeManager.class.getName()),
+          v + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+            "CartesianProductEdgeManager");
+      }
     }
 
     vertexManagerReal = config.getIsPartitioned()

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
index f43f494..e082ec3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -28,32 +28,24 @@ import java.nio.ByteBuffer;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
 
 class CartesianProductVertexManagerConfig extends CartesianProductConfig {
-  private final float minFraction;
-  private final float maxFraction;
-  private final boolean enableAutoGrouping;
-  private final long desiredBytesPerGroup;
+  final float minFraction;
+  final float maxFraction;
+  final boolean enableAutoGrouping;
+  final long desiredBytesPerChunk;
 
-  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources,
                                              int[] numPartitions,
                                              float minFraction, float maxFraction,
-                                             boolean enableAutoGrouping, long desiredBytesPerGroup,
+                                             boolean enableAutoGrouping, long desiredBytesPerChunk,
                                              CartesianProductFilterDescriptor filterDescriptor) {
-    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    super(isPartitioned, numPartitions, sources, filterDescriptor);
     Preconditions.checkArgument(minFraction <= maxFraction,
       "min fraction(" + minFraction + ") should be less than max fraction(" +
         maxFraction  + ") in cartesian product slow start");
     this.minFraction = minFraction;
     this.maxFraction = maxFraction;
     this.enableAutoGrouping = enableAutoGrouping;
-    this.desiredBytesPerGroup = desiredBytesPerGroup;
-  }
-
-  public float getMinFraction() {
-    return minFraction;
-  }
-
-  public float getMaxFraction() {
-    return maxFraction;
+    this.desiredBytesPerChunk = desiredBytesPerChunk;
   }
 
   public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
@@ -62,8 +54,8 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
       CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
 
     boolean isPartitioned = proto.getIsPartitioned();
-    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
-    proto.getSourceVerticesList().toArray(sourceVertices);
+    String[] sources = new String[proto.getSourcesList().size()];
+    proto.getSourcesList().toArray(sources);
     int[] numPartitions =
       proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
     CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -77,17 +69,9 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
 
     boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping()
       : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT;
-    long desiredBytesPerGroup = proto.hasDesiredBytesPerGroup() ? proto.getDesiredBytesPerGroup()
+    long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk()
       : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT;
-    return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+    return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions,
       minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor);
   }
-
-  public boolean isEnableAutoGrouping() {
-    return enableAutoGrouping;
-  }
-
-  public long getDesiredBytesPerGroup() {
-    return desiredBytesPerGroup;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
index 85c04d2..ddff37d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -174,12 +174,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
     // determine the destination task with largest id to schedule
     float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
     int numTaskToSchedule;
-    if (percentFinishedSrcTask < config.getMinFraction()) {
+    if (percentFinishedSrcTask < config.minFraction) {
       numTaskToSchedule = 0;
-    } else if (config.getMinFraction() <= percentFinishedSrcTask &&
-        percentFinishedSrcTask <= config.getMaxFraction()) {
-      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
-        /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+    } else if (config.minFraction <= percentFinishedSrcTask &&
+        percentFinishedSrcTask <= config.maxFraction) {
+      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction)
+        /(config.maxFraction-config.minFraction)*parallelism);
     } else {
       numTaskToSchedule = parallelism;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
index 993cb40..46ea76e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
@@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.cartesianproduct;
 
 import com.google.common.primitives.Ints;
 import com.google.protobuf.ByteString;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,31 +48,132 @@ import java.util.Set;
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
 import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
 
+/**
+ * In unpartitioned case, we have one destination task for each source chunk combination. A source
+ * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping)
+ * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across
+ * vertices. The mapping from source chunk to destination task id is done by
+ * {@link <CartesianProductCombination>}.
+ *
+ * If auto grouping is enabled, this vertex manager will estimate output size of each source and
+ * group source tasks of each source in chunk according to desired grouping size configured by user.
+ *
+ *
+ */
 class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
+  /**
+   * a cartesian product source
+   */
+  static class Source {
+    // list of source vertices of this source
+    List<SrcVertex> srcVertices = new ArrayList<>();
+    // position of this source in all sources
+    int position;
+    // name of source vertex or vertex group
+    String name;
+
+    // total number of chunks in this source
+    public int getNumChunk() {
+      int numChunk = 0;
+      for (SrcVertex srcV : srcVertices) {
+        numChunk += srcV.numChunk;
+      }
+      return numChunk;
+    }
+
+    // whether this source has any task completed
+    public boolean hasTaskCompleted() {
+      for (SrcVertex srcV : srcVertices) {
+        if (!srcV.taskCompleted.isEmpty()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Source at position ");
+      sb.append(position);
+      if (name != null) {
+        sb.append(", ");
+        sb.append("vertex group ");
+        sb.append(name);
+
+      }
+      sb.append(": {");
+      for (SrcVertex srcV : srcVertices) {
+        sb.append("[");
+        sb.append(srcV.toString(afterReconfigure));
+        sb.append("], ");
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      sb.setCharAt(sb.length() - 1, '}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * a cartesian product source vertex
+   */
+  class SrcVertex {
+    // which source this vertex belongs to
+    Source source;
+    // vertex name
+    String name;
+    int numTask;
+    // num chunks of this source vertex
+    int numChunk;
+    // offset of chunk id in vertex group
+    // we need sequence chunks in the vertex group to make them look like from single vertex
+    int chunkIdOffset = 0;
+    RoaringBitmap taskCompleted = new RoaringBitmap();
+    RoaringBitmap taskWithVMEvent = new RoaringBitmap();
+    long outputBytes;
+
+    public void doGrouping() {
+      numChunk = numTask;
+      if (config.enableAutoGrouping) {
+        outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality();
+        numChunk = Math.min(numChunk,
+          (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk));
+      }
+    }
+
+    public String toString(boolean afterReconfigure) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("vertex ").append(name).append(", ");
+      if (afterReconfigure) {
+        sb.append("estimated output ").append(outputBytes).append(" bytes, ");
+        sb.append(numChunk).append(" chunks");
+      } else {
+        sb.append(numTask).append(" tasks, ");
+        sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
+        sb.append("output ").append(outputBytes).append(" bytes");
+      }
+      return sb.toString();
+    }
+  }
+
   private static final Logger LOG =
     org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class);
 
-  List<String> sourceVertices;
-  private int parallelism = 1;
+  CartesianProductVertexManagerConfig config;
+  Map<String, Source> sourcesByName = new HashMap<>();
+  Map<String, SrcVertex> srcVerticesByName = new HashMap<>();
+
   private boolean vertexReconfigured = false;
   private boolean vertexStarted = false;
   private boolean vertexStartSchedule = false;
   private int numCPSrcNotInConfigureState = 0;
   private int numBroadcastSrcNotInRunningState = 0;
-  private int[] numTasks;
-
   private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>();
-  private Map<String, RoaringBitmap> sourceTaskCompleted = new HashMap<>();
   private RoaringBitmap scheduledTasks = new RoaringBitmap();
-  private CartesianProductConfig config;
 
   /* auto reduce related */
-  private int[] numGroups;
+  // num of chunks of source at the corresponding position in source list
+  private int[] numChunksPerSrc;
   private Set<String> vertexSentVME = new HashSet<>();
-  private long[] vertexOutputBytes;
-  private int[] numVertexManagerEventsReceived;
-  private long desiredBytesPerGroup;
-  private boolean enableGrouping;
   private Grouper grouper = new Grouper();
 
   public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
@@ -83,29 +182,39 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
 
   @Override
   public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
-    sourceVertices = config.getSourceVertices();
-    numTasks = new int[sourceVertices.size()];
-    numGroups = new int[sourceVertices.size()];
-    vertexOutputBytes = new long[sourceVertices.size()];
-    numVertexManagerEventsReceived = new int[sourceVertices.size()];
-
-    enableGrouping = config.isEnableAutoGrouping();
-    desiredBytesPerGroup = config.getDesiredBytesPerGroup();
-
-    for (String vertex : sourceVertices) {
-      sourceTaskCompleted.put(vertex, new RoaringBitmap());
-    }
-
-    for (String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
-      if (sourceVertices.indexOf(vertex) != -1) {
-        sourceTaskCompleted.put(vertex, new RoaringBitmap());
-        getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+    for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) {
+      if (e.getValue().getDataMovementType() == CUSTOM
+        && e.getValue().getEdgeManagerDescriptor().getClassName()
+          .equals(CartesianProductEdgeManager.class.getName())) {
+        srcVerticesByName.put(e.getKey(), new SrcVertex());
+        srcVerticesByName.get(e.getKey()).name = e.getKey();
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED));
         numCPSrcNotInConfigureState++;
       } else {
-        getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.RUNNING));
+        getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING));
         numBroadcastSrcNotInRunningState++;
       }
     }
+
+    Map<String, List<String>> srcGroups = getContext().getInputVertexGroups();
+    for (int i = 0; i < config.getSourceVertices().size(); i++) {
+      String srcName = config.getSourceVertices().get(i);
+      Source source = new Source();
+      source.position = i;
+      if (srcGroups.containsKey(srcName)) {
+        source.name = srcName;
+        for (String srcVName : srcGroups.get(srcName)) {
+          source.srcVertices.add(srcVerticesByName.get(srcVName));
+          srcVerticesByName.get(srcVName).source = source;
+        }
+      } else {
+        source.srcVertices.add(srcVerticesByName.get(srcName));
+        srcVerticesByName.get(srcName).source = source;
+      }
+      sourcesByName.put(srcName, source);
+    }
+
+    numChunksPerSrc = new int[sourcesByName.size()];
     this.config = config;
     getContext().vertexReconfigurationPlanned();
   }
@@ -128,7 +237,7 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     VertexState state = stateUpdate.getVertexState();
 
     if (state == VertexState.CONFIGURED) {
-      numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex);
+      srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex);
       numCPSrcNotInConfigureState--;
     } else if (state == VertexState.RUNNING) {
       numBroadcastSrcNotInRunningState--;
@@ -145,22 +254,20 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
   private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) {
     int taskId = attempt.getTaskIdentifier().getIdentifier();
     String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    if (sourceVertices.indexOf(vertex) == -1) {
-      return;
-    }
-    if (sourceTaskCompleted.get(vertex).contains(taskId)) {
-      return;
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    if (srcV != null && !srcV.taskCompleted.contains(taskId)) {
+      srcV.taskCompleted.add(taskId);
+      completedSrcTaskToProcess.add(attempt);
     }
-    sourceTaskCompleted.get(vertex).add(taskId);
-    completedSrcTaskToProcess.add(attempt);
   }
 
   private boolean tryStartSchedule() {
     if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) {
       return false;
     }
-    for (RoaringBitmap bitmap: sourceTaskCompleted.values()) {
-      if (bitmap.isEmpty()) {
+
+    for (Source src : sourcesByName.values()) {
+      if (!src.hasTaskCompleted()) {
         return false;
       }
     }
@@ -178,15 +285,17 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     if (vmEvent.getUserPayload() != null) {
       String srcVertex =
         vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName();
-      int position = sourceVertices.indexOf(srcVertex);
+      SrcVertex srcV = srcVerticesByName.get(srcVertex);
+
       // vmEvent from non-cp vertex doesn't matter
-      if (position == -1) {
+      if (srcV == null) {
         return;
       }
+
       VertexManagerEventPayloadProto proto =
         VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
-      vertexOutputBytes[position] += proto.getOutputSize();
-      numVertexManagerEventsReceived[position]++;
+      srcV.outputBytes += proto.getOutputSize();
+      srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier());
       vertexSentVME.add(srcVertex);
     }
 
@@ -197,60 +306,65 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     if (numCPSrcNotInConfigureState > 0) {
       return false;
     }
-    if (enableGrouping) {
-      if (vertexSentVME.size() != sourceVertices.size()) {
+    if (config.enableAutoGrouping) {
+      if (vertexSentVME.size() != srcVerticesByName.size()) {
         return false;
       }
-      for (int i = 0; i < vertexOutputBytes.length; i++) {
-        if (vertexOutputBytes[i] < desiredBytesPerGroup
-          && numVertexManagerEventsReceived[i] < numTasks[i]) {
+      // every src v must output at least one chunk size
+      for (SrcVertex srcV : srcVerticesByName.values()) {
+        if (srcV.outputBytes < config.desiredBytesPerChunk
+          && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
           return false;
         }
       }
     }
 
-    LOG.info("Start reconfigure, grouping: " + enableGrouping
-      + ", group size: " + desiredBytesPerGroup);
-    LOG.info("src vertices: " + sourceVertices);
-    LOG.info("number of source tasks in each src: " + Arrays.toString(numTasks));
-    LOG.info("number of vmEvent from each src: "
-      + Arrays.toString(numVertexManagerEventsReceived));
-    LOG.info("output stats of each src: " + Arrays.toString(vertexOutputBytes));
-
-    for (int i = 0; i < numTasks.length; i++) {
-      if (enableGrouping) {
-        vertexOutputBytes[i] =
-          vertexOutputBytes[i] * numTasks[i] / numVertexManagerEventsReceived[i];
-        int desiredNumGroup =
-          (int) ((vertexOutputBytes[i] + desiredBytesPerGroup - 1) / desiredBytesPerGroup);
-        numGroups[i] = Math.min(numTasks[i], desiredNumGroup);
-      } else {
-        numGroups[i] = numTasks[i];
+    LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping
+      + ", chunk size: " + config.desiredBytesPerChunk + " bytes.");
+    for (String srcName : config.getSourceVertices()) {
+      LOG.info(sourcesByName.get(srcName).toString(false));
+    }
+
+    for (Source src : sourcesByName.values()) {
+      for (int i = 0; i < src.srcVertices.size(); i++) {
+        src.srcVertices.get(i).doGrouping();
+        if (i > 0) {
+          src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk;
+        }
       }
-      parallelism *= numGroups[i];
+      numChunksPerSrc[src.position] = src.getNumChunk();
     }
 
-    LOG.info("estimated output size of each src: " + Arrays.toString(vertexOutputBytes));
-    LOG.info("number of groups for each src: " + Arrays.toString(numGroups));
+    int parallelism = 1;
+    for (Source src : sourcesByName.values()) {
+      parallelism *= src.getNumChunk();
+    }
+
+    LOG.info("After reconfigure, ");
+    for (String srcName : config.getSourceVertices()) {
+      LOG.info(sourcesByName.get(srcName).toString(true));
+    }
     LOG.info("Final parallelism: " + parallelism);
 
-    UserPayload payload = null;
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    for (int i = 0; i < numChunksPerSrc.length; i++) {
+      numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk();
+    }
+    builder.setIsPartitioned(false).addAllSources(config.getSourceVertices())
+      .addAllNumChunks(Ints.asList(this.numChunksPerSrc));
+
     Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
     Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator();
     while (iter.hasNext()) {
-      EdgeProperty edgeProperty = iter.next().getValue();
-      if (edgeProperty.getDataMovementType() != CUSTOM) {
+      Map.Entry<String, EdgeProperty> e = iter.next();
+      if (e.getValue().getDataMovementType() != CUSTOM) {
         iter.remove();
-        continue;
-      }
-      EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor();
-      if (payload == null) {
-        CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-        builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
-          .addAllNumGroups(Ints.asList(numGroups)).addAllSourceVertices(config.getSourceVertices());
-        payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+      } else {
+        SrcVertex srcV = srcVerticesByName.get(e.getKey());
+        builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset);
+        e.getValue().getEdgeManagerDescriptor()
+          .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
       }
-      descriptor.setUserPayload(payload);
     }
     getContext().reconfigureVertex(parallelism, null, edgeProperties);
     vertexReconfigured = true;
@@ -267,32 +381,42 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
     }
 
     while (!completedSrcTaskToProcess.isEmpty()) {
-      scheduledTasksDependOnCompletion(completedSrcTaskToProcess.poll());
+      scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll());
     }
   }
 
-  private void scheduledTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
+  private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
     int taskId = attempt.getTaskIdentifier().getIdentifier();
     String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
-    int position = sourceVertices.indexOf(vertex);
+    SrcVertex srcV = srcVerticesByName.get(vertex);
+    Source src = srcV.source;
 
     List<ScheduleTaskRequest> requests = new ArrayList<>();
     CartesianProductCombination combination =
-      new CartesianProductCombination(numGroups, position);
-    grouper.init(numTasks[position], numGroups[position]);
-    combination.firstTaskWithFixedPartition(grouper.getGroupId(taskId));
+      new CartesianProductCombination(numChunksPerSrc, src.position);
+    grouper.init(srcV.numTask, srcV.numChunk);
+    combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset);
     do {
       List<Integer> list = combination.getCombination();
 
-      if (scheduledTasks.contains(combination.getTaskId())) {
+      if (scheduledTasks.contains(combination.getChunkId())) {
         continue;
       }
       boolean readyToSchedule = true;
       for (int i = 0; i < list.size(); i++) {
-        int group = list.get(i);
-        grouper.init(numTasks[i], numGroups[i]);
-        for (int j = grouper.getFirstTaskInGroup(group); j <= grouper.getLastTaskInGroup(group); j++) {
-          if (!sourceTaskCompleted.get(sourceVertices.get(i)).contains(j)) {
+        int chunkId = list.get(i);
+        SrcVertex srcVHasGroup = null;
+        for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) {
+          if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) {
+            srcVHasGroup = v;
+            break;
+          }
+        }
+        assert srcVHasGroup != null;
+        grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk);
+        chunkId -= srcVHasGroup.chunkIdOffset;
+        for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) {
+          if (!srcVHasGroup.taskCompleted.contains(j)) {
             readyToSchedule = false;
             break;
           }
@@ -303,10 +427,10 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
       }
 
       if (readyToSchedule) {
-        requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
-        scheduledTasks.add(combination.getTaskId());
+        requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null));
+        scheduledTasks.add(combination.getChunkId());
       }
-    } while (combination.nextTaskWithFixedPartition());
+    } while (combination.nextTaskWithFixedChunk());
     if (!requests.isEmpty()) {
       getContext().scheduleTasks(requests);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
index dd7d06f..cb503ea 100644
--- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -21,14 +21,15 @@ option java_outer_classname = "CartesianProductUserPayload";
 
 message CartesianProductConfigProto {
     required bool isPartitioned = 1;
-    repeated string sourceVertices = 2;
+    repeated string sources = 2;
     repeated int32 numPartitions = 3;
     optional string filterClassName = 4;
     optional bytes filterUserPayload = 5;
     optional float minFraction = 6;
     optional float maxFraction = 7;
     optional bool enableAutoGrouping = 8;
-    optional int64 desiredBytesPerGroup = 9;
-    repeated int32 numTasks = 10;
-    repeated int32 numGroups = 11;
+    optional int64 desiredBytesPerChunk = 9;
+    repeated int32 numChunks = 10;
+    optional int32 numChunk = 11;
+    optional int32 chunkIdOffset = 12;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 9a5614e..439d650 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -331,6 +331,11 @@ public class TestShuffleVertexManagerUtils {
           public int getDestinationVertexNumTasks() {
             return numTasks;
           }
+
+          @Override
+          public String getVertexGroupName() {
+            return null;
+          }
         };
         if (newEdgeManagers != null) {
           EdgeManagerPlugin edgeManager = ReflectionUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
index 06d3e90..3755ac8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -30,44 +30,44 @@ import static org.junit.Assert.assertTrue;
 public class TestCartesianProductCombination {
   private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
     assertArrayEquals(result, Ints.toArray(combination.getCombination()));
-    assertEquals(taskId, combination.getTaskId());
+    assertEquals(taskId, combination.getChunkId());
   }
 
   private void testCombinationTwoWayVertex0() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{1,0}, 3);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1}, 4);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,2}, 5);
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   private void testCombinationTwoWayVertex1() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{0,1}, 1);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1}, 4);
 
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   private void testCombinationThreeWay() {
     CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1);
 
-    combination.firstTaskWithFixedPartition(1);
+    combination.firstTaskWithFixedChunk(1);
     verifyCombination(combination, new int[]{0,1,0}, 2);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{0,1,1}, 3);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1,0}, 6);
-    assertTrue(combination.nextTaskWithFixedPartition());
+    assertTrue(combination.nextTaskWithFixedChunk());
     verifyCombination(combination, new int[]{1,1,1}, 7);
-    assertFalse(combination.nextTaskWithFixedPartition());
+    assertFalse(combination.nextTaskWithFixedChunk());
   }
 
   @Test(timeout = 5000)
@@ -110,9 +110,9 @@ public class TestCartesianProductCombination {
 
   @Test(timeout = 5000)
   public void testRejectZero() {
-    int[] numTasks = new int[] {0 ,1};
+    int[] numChunk = new int[] {0 ,1};
     try {
-      new CartesianProductCombination(numTasks);
+      new CartesianProductCombination(numChunk);
       assertTrue(false);
     } catch (Exception ignored) {}
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
index c9e49a3..4857749 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -122,15 +122,15 @@ public class TestCartesianProductConfig {
     // auto grouping conf not set
     CartesianProductConfigProto proto = config.toProto(conf);
     assertFalse(proto.hasEnableAutoGrouping());
-    assertFalse(proto.hasDesiredBytesPerGroup());
+    assertFalse(proto.hasDesiredBytesPerChunk());
 
     // auto groupinig conf not set
     conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
     conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
     proto = config.toProto(conf);
     assertTrue(proto.hasEnableAutoGrouping());
-    assertTrue(proto.hasDesiredBytesPerGroup());
+    assertTrue(proto.hasDesiredBytesPerChunk());
     assertEquals(true, proto.getEnableAutoGrouping());
-    assertEquals(1000, proto.getDesiredBytesPerGroup());
+    assertEquals(1000, proto.getDesiredBytesPerChunk());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
index 12aee3b..d722932 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -40,7 +40,7 @@ public class TestCartesianProductEdgeManager {
     // partitioned case
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(true)
-      .addAllSourceVertices(Arrays.asList("v0", "v1"))
+      .addAllSources(Arrays.asList("v0", "v1"))
       .addAllNumPartitions(Ints.asList(2,3));
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
@@ -51,8 +51,8 @@ public class TestCartesianProductEdgeManager {
     // unpartitioned case
     builder.clear();
     builder.setIsPartitioned(false)
-      .addAllSourceVertices(Arrays.asList("v0", "v1"))
-      .addAllNumTasks(Ints.asList(2,3));
+      .addAllSources(Arrays.asList("v0", "v1"))
+      .addAllNumChunks(Ints.asList(2,3));
     payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
     when(context.getUserPayload()).thenReturn(payload);
     when(context.getSourceVertexNumTasks()).thenReturn(2);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
index 9f6fa09..3ba6aad 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
@@ -28,23 +28,26 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 public class TestCartesianProductEdgeManagerConfig {
   @Test(timeout = 5000)
-  public void testAutoGroupingConfig() throws IOException {
+  public void testUnpartitionedAutoGroupingConfig() throws IOException {
     List<String> sourceVertices = new ArrayList<>();
     sourceVertices.add("v0");
     sourceVertices.add("v1");
-    int[] numTasks = new int[] {4, 5};
-    int[] numGroups = new int[] {2, 3};
+    int[] numChunkPerSrc = new int[] {2, 3};
+    int numGroup = 3, chunkIdOffset = 0;
 
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
-    builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
-      .addAllSourceVertices(sourceVertices).addAllNumGroups(Ints.asList(numGroups));
+    builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc))
+      .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset);
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
 
     CartesianProductEdgeManagerConfig config =
       CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-    assertArrayEquals(numGroups, config.getNumGroups());
+    assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc);
+    assertEquals(numGroup, config.numChunk);
+    assertEquals(chunkIdOffset, config.chunkIdOffset);
   }
 }