You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/06 17:51:14 UTC

[1/2] tez git commit: TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)

Repository: tez
Updated Branches:
  refs/heads/master af8246931 -> 1a068b239


http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
new file mode 100644
index 0000000..af7d15e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
+  List<String> sourceVertices;
+  private int parallelism = 1;
+  private boolean vertexStarted = false;
+  private boolean vertexReconfigured = false;
+  private int numSourceVertexConfigured = 0;
+  private int[] numTasks;
+  private Queue<TaskAttemptIdentifier> pendingCompletedSrcTask = new LinkedList<>();
+  private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+  private BitSet scheduledTasks = new BitSet();
+  private CartesianProductConfig config;
+  private int numSrcHasCompletedTask = 0;
+
+  public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
+    sourceVertices = config.getSourceVertices();
+    numTasks = new int[sourceVertices.size()];
+    for (String vertex : sourceVertices) {
+      sourceTaskCompleted.put(vertex, new BitSet());
+    }
+    for (String vertex : sourceVertices) {
+      getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+    }
+    this.config = config;
+    getContext().vertexReconfigurationPlanned();
+  }
+
+  private void reconfigureVertex() throws IOException {
+    for (int numTask : numTasks) {
+      parallelism *= numTask;
+    }
+
+    UserPayload payload = null;
+    Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
+    for (EdgeProperty edgeProperty : edgeProperties.values()) {
+      EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor();
+      if (payload == null) {
+        CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+        builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
+          .addAllSourceVertices(config.getSourceVertices());
+        payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+      }
+      descriptor.setUserPayload(payload);
+    }
+    getContext().reconfigureVertex(parallelism, null, edgeProperties);
+    vertexReconfigured = true;
+    getContext().doneReconfiguringVertex();
+  }
+
+  @Override
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+    throws Exception {
+    vertexStarted = true;
+    // if vertex is already reconfigured, we can handle pending completions immediately
+    // otherwise we have to wait until vertex is reconfigured
+    if (vertexReconfigured) {
+      Preconditions.checkArgument(pendingCompletedSrcTask.size() == 0,
+        "Unexpected pending source completion on vertex start after vertex reconfiguration");
+      for (TaskAttemptIdentifier taId : completions) {
+        handleCompletedSrcTask(taId);
+      }
+    } else {
+      pendingCompletedSrcTask.addAll(completions);
+    }
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException {
+    Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+    String vertex = stateUpdate.getVertexName();
+    numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex);
+    // reconfigure vertex when all source vertices are CONFIGURED
+    if (++numSourceVertexConfigured == sourceVertices.size()) {
+      reconfigureVertex();
+      // handle pending source completions when vertex is started and reconfigured
+      if (vertexStarted) {
+        while (!pendingCompletedSrcTask.isEmpty()) {
+          handleCompletedSrcTask(pendingCompletedSrcTask.poll());
+        }
+      }
+    }
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    if (numSourceVertexConfigured < sourceVertices.size()) {
+      pendingCompletedSrcTask.add(attempt);
+      return;
+    }
+    Preconditions.checkArgument(pendingCompletedSrcTask.size() == 0,
+      "Unexpected pending src completion on source task completed after vertex reconfiguration");
+    handleCompletedSrcTask(attempt);
+  }
+
+  private void handleCompletedSrcTask(TaskAttemptIdentifier attempt) {
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
+    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    if (sourceTaskCompleted.get(vertex).get(taskId)) {
+      return;
+    }
+
+    if (sourceTaskCompleted.get(vertex).isEmpty()) {
+      numSrcHasCompletedTask++;
+    }
+    sourceTaskCompleted.get(vertex).set(taskId);
+    if (numSrcHasCompletedTask != sourceVertices.size()) {
+      return;
+    }
+
+    List<ScheduleTaskRequest> requests = new ArrayList<>();
+    CartesianProductCombination combination = new CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
+    combination.firstTaskWithFixedPartition(taskId);
+    do {
+      List<Integer> list = combination.getCombination();
+      boolean readyToSchedule = true;
+      for (int i = 0; i < list.size(); i++) {
+        if (!sourceTaskCompleted.get(sourceVertices.get(i)).get(list.get(i))) {
+          readyToSchedule = false;
+          break;
+        }
+      }
+      if (readyToSchedule && !scheduledTasks.get(combination.getTaskId())) {
+        requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
+        scheduledTasks.set(combination.getTaskId());
+      }
+    } while (combination.nextTaskWithFixedPartition());
+    if (!requests.isEmpty()) {
+      getContext().scheduleTasks(requests);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
new file mode 100644
index 0000000..39ba82c
--- /dev/null
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.library.cartesianproduct";
+option java_outer_classname = "CartesianProductUserPayload";
+
+message CartesianProductConfigProto {
+    required bool isPartitioned = 1;
+    repeated string sourceVertices = 2;
+    repeated int32 numPartitions = 3;
+    optional string filterClassName = 4;
+    optional bytes filterUserPayload = 5;
+    optional float minFraction = 6;
+    optional float maxFraction = 7;
+    repeated int32 numTasks = 8;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
new file mode 100644
index 0000000..0d6a928
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestCartesianProductCombination {
+  private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
+    assertArrayEquals(result, Ints.toArray(combination.getCombination()));
+    assertEquals(taskId, combination.getTaskId());
+  }
+
+  private void testCombinationTwoWayVertex0() {
+    CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0);
+
+    combination.firstTaskWithFixedPartition(1);
+    verifyCombination(combination, new int[]{1,0}, 3);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{1,1}, 4);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{1,2}, 5);
+    assertFalse(combination.nextTaskWithFixedPartition());
+  }
+
+  private void testCombinationTwoWayVertex1() {
+    CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1);
+
+    combination.firstTaskWithFixedPartition(1);
+    verifyCombination(combination, new int[]{0,1}, 1);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{1,1}, 4);
+
+    assertFalse(combination.nextTaskWithFixedPartition());
+  }
+
+  private void testCombinationThreeWay() {
+    CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1);
+
+    combination.firstTaskWithFixedPartition(1);
+    verifyCombination(combination, new int[]{0,1,0}, 2);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{0,1,1}, 3);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{1,1,0}, 6);
+    assertTrue(combination.nextTaskWithFixedPartition());
+    verifyCombination(combination, new int[]{1,1,1}, 7);
+    assertFalse(combination.nextTaskWithFixedPartition());
+  }
+
+  @Test(timeout = 5000)
+  public void testCombinationWithFixedPartition() {
+    // two way cartesian product
+    testCombinationTwoWayVertex0();
+    testCombinationTwoWayVertex1();
+
+    // three way cartesian product
+    testCombinationThreeWay();
+  }
+
+  @Test(timeout = 5000)
+  public void testCombination() {
+    CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3});
+    List<Integer> list = combination.getCombination();
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (i == 0 && j == 0) {
+          combination.firstTask();
+        } else {
+          assertTrue(combination.nextTask());
+        }
+        assertTrue(list.get(0) == i);
+        assertTrue(list.get(1) == j);
+      }
+    }
+    assertFalse(combination.nextTask());
+  }
+
+  @Test//(timeout = 5000)
+  public void testFromTaskId() {
+    for (int i = 0; i < 6; i++) {
+      List<Integer> list = CartesianProductCombination.fromTaskId(new int[]{2,3}, i)
+                                                      .getCombination();
+      assertTrue(list.get(0) == i/3);
+      assertTrue(list.get(1) == i%3);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
new file mode 100644
index 0000000..2de750f
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestCartesianProductConfig {
+  private TezConfiguration conf = new TezConfiguration();
+
+  @Test(timeout = 5000)
+  public void testSerializationPartitioned() throws IOException {
+    Map<String, Integer> vertexPartitionMap = new HashMap<>();
+    vertexPartitionMap.put("v1", 2);
+    vertexPartitionMap.put("v2", 3);
+    vertexPartitionMap.put("v3", 4);
+    String filterClassName = "filter";
+    byte[] bytes = new byte[10];
+    (new Random()).nextBytes(bytes);
+    CartesianProductFilterDescriptor filterDescriptor =
+      new CartesianProductFilterDescriptor(filterClassName)
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(bytes)));
+    CartesianProductConfig config =
+      new CartesianProductConfig(vertexPartitionMap, filterDescriptor);
+    UserPayload payload = config.toUserPayload(conf);
+    CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload);
+    assertConfigEquals(config, parsedConfig);
+  }
+
+  @Test(timeout = 5000)
+  public void testSerializationUnpartitioned() throws Exception {
+    List<String> sourceVertices = new ArrayList<>();
+    sourceVertices.add("v1");
+    sourceVertices.add("v2");
+    sourceVertices.add("v3");
+    CartesianProductConfig config =
+      new CartesianProductConfig(sourceVertices);
+    UserPayload payload = config.toUserPayload(conf);
+    CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload);
+    assertConfigEquals(config, parsedConfig);
+
+    // unpartitioned config should have null in numPartitions fields
+    try {
+      config = new CartesianProductConfig(false, new int[]{}, new String[]{"v0","v1"},null);
+      config.checkNumPartitions();
+    } catch (Exception e) {
+      return;
+    }
+    throw new Exception();
+  }
+
+  private void assertConfigEquals(CartesianProductConfig config1, CartesianProductConfig config2) {
+    assertArrayEquals(config1.getSourceVertices().toArray(new String[0]),
+      config2.getSourceVertices().toArray(new String[0]));
+    if (config1.getNumPartitions() == null) {
+      assertNull(config2.getNumPartitions());
+    } else {
+      assertArrayEquals(Ints.toArray(config1.getNumPartitions()),
+        Ints.toArray(config2.getNumPartitions()));
+    }
+    CartesianProductFilterDescriptor descriptor1 = config1.getFilterDescriptor();
+    CartesianProductFilterDescriptor descriptor2 = config1.getFilterDescriptor();
+
+    if (descriptor1 != null && descriptor2 != null) {
+      assertEquals(descriptor1.getClassName(), descriptor2.getClassName());
+      UserPayload payload1 = descriptor1.getUserPayload();
+      UserPayload payload2 = descriptor2.getUserPayload();
+      if (payload1 != null && payload2 != null) {
+        assertEquals(0, payload1.getPayload().compareTo(payload2.getPayload()));
+      }
+    } else {
+      assertNull(descriptor1);
+      assertNull(descriptor2);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
new file mode 100644
index 0000000..9581a6e
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManager {
+  @Test(timeout = 5000)
+  public void testInitialize() throws Exception {
+    EdgeManagerPluginContext context = mock(EdgeManagerPluginContext.class);
+    when(context.getSourceVertexName()).thenReturn("v0");
+    CartesianProductEdgeManager edgeManager = new CartesianProductEdgeManager(context);
+
+    // partitioned case
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(true)
+      .addAllSourceVertices(Arrays.asList(new String[]{"v0", "v1"}))
+      .addAllNumPartitions(Ints.asList(new int[]{2,3}));
+    UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+    when(context.getUserPayload()).thenReturn(payload);
+    edgeManager.initialize();
+    assertTrue(edgeManager.getEdgeManagerReal()
+      instanceof CartesianProductEdgeManagerPartitioned);
+
+    // unpartitioned case
+    List<String> sourceVertices = new ArrayList<>();
+    sourceVertices.add("v0");
+    sourceVertices.add("v1");
+    builder.clear();
+    builder.setIsPartitioned(false)
+      .addAllSourceVertices(Arrays.asList(new String[]{"v0", "v1"}))
+      .addAllNumTasks(Ints.asList(new int[]{2,3}));
+    payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+    when(context.getUserPayload()).thenReturn(payload);
+    when(context.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize();
+    assertTrue(edgeManager.getEdgeManagerReal()
+      instanceof CartesianProductEdgeManagerUnpartitioned);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..2e8697d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManagerPartitioned {
+  private EdgeManagerPluginContext mockContext;
+  private CartesianProductEdgeManagerPartitioned edgeManager;
+
+  @Before
+  public void setup() {
+    mockContext = mock(EdgeManagerPluginContext.class);
+    edgeManager = new CartesianProductEdgeManagerPartitioned(mockContext);
+  }
+
+  /**
+   * Vertex v0 has 2 tasks which generate 3 partitions
+   * Vertex v1 has 3 tasks which generate 4 partitions
+   */
+  @Test(timeout = 5000)
+  public void testTwoWay() throws Exception {
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null);
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(12);
+    testTwoWayV0(emConfig);
+    testTwoWayV1(emConfig);
+  }
+
+  private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeDataMovementEventToDestination(1,0,1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeDataMovementEventToDestination(1,1,1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+    assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+    assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  public static class TestFilter extends CartesianProductFilter {
+    char op;
+
+    public TestFilter(UserPayload payload) {
+      super(payload);
+      op = payload.getPayload().getChar();
+    }
+
+    @Override
+    public boolean isValidCombination(Map<String, Integer> vertexPartitionMap) {
+      switch (op) {
+        case '>':
+          return vertexPartitionMap.get("v0") > vertexPartitionMap.get("v1");
+        case '<':
+          return vertexPartitionMap.get("v0") < vertexPartitionMap.get("v1");
+        default:
+          return true;
+      }
+    }
+  }
+
+  /**
+   * Vertex v0 has 2 tasks which generate 3 partitions
+   * Vertex v1 has 3 tasks which generate 4 partitions
+   */
+  @Test//(timeout = 5000)
+  public void testTwoWayWithFilter() throws Exception {
+    ByteBuffer buffer = ByteBuffer.allocate(2);
+    buffer.putChar('>');
+    buffer.flip();
+    CartesianProductFilterDescriptor filterDescriptor =
+      new CartesianProductFilterDescriptor(TestFilter.class.getName())
+        .setUserPayload(UserPayload.create(buffer));
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null,
+        filterDescriptor);
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+    testTwoWayV0WithFilter(emConfig);
+    testTwoWayV1WithFilter(emConfig);
+  }
+
+  private void testTwoWayV0WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{2}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+    assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  private void testTwoWayV1WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+    assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  /**
+   * Vertex v0 has 2 tasks which generate 4 partitions
+   * Vertex v1 has 3 tasks which generate 3 partitions
+   * Vertex v2 has 4 tasks which generate 2 partitions
+   */
+  @Test(timeout = 5000)
+  public void testThreeWay() throws Exception {
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, null);
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(24);
+    testThreeWayV0(emConfig);
+    testThreeWayV1(emConfig);
+    testThreeWayV2(emConfig);
+  }
+
+  private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+    assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+    assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+
+  private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v2");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+    assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+    assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+    assertEquals(4, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+    assertEquals(2, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..4c69482
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManagerUnpartitioned {
+  private EdgeManagerPluginContext mockContext;
+  private CartesianProductEdgeManagerUnpartitioned edgeManager;
+
+  @Before
+  public void setup() {
+    mockContext = mock(EdgeManagerPluginContext.class);
+    edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext);
+  }
+
+  /**
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 3 tasks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWay() throws Exception {
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, new int[]{2,3}, null);
+    testTwoWayV0(emConfig);
+    testTwoWayV1(emConfig);
+  }
+
+  private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+    assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
+
+    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+    assertEquals(2, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  /**
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 3 tasks
+   * Vertex v2 has 4 tasks
+   */
+  @Test(timeout = 5000)
+  public void testThreeWay() throws Exception {
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, null, new int[]{2,3,4}, null);
+    testThreeWayV0(emConfig);
+    testThreeWayV1(emConfig);
+    testThreeWayV2(emConfig);
+  }
+
+  private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+    assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+    assertEquals(8, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn("v2");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
+    edgeManager.initialize(config);
+
+    EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
+    assertNull(routingData);
+
+    routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+    assertNotNull(routingData);
+    assertEquals(1, routingData.getNumEvents());
+    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
+
+    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+    assertEquals(6, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  @Test(timeout = 5000)
+  public void testZeroSrcTask() {
+    CartesianProductEdgeManagerConfig emConfig =
+      new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null, new int[]{2, 0}, null);
+    testZeroSrcTaskV0(emConfig);
+    testZeroSrcTaskV1(emConfig);
+  }
+
+  private void testZeroSrcTaskV0(CartesianProductEdgeManagerConfig config) {
+    when(mockContext.getSourceVertexName()).thenReturn("v0");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+    edgeManager.initialize(config);
+
+    assertEquals(0, edgeManager.getNumDestinationConsumerTasks(0));
+    assertEquals(0, edgeManager.getNumDestinationConsumerTasks(1));
+  }
+
+  private void testZeroSrcTaskV1(CartesianProductEdgeManagerConfig config) {
+    when(mockContext.getSourceVertexName()).thenReturn("v1");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(0);
+    edgeManager.initialize(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
new file mode 100644
index 0000000..755c578
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManager {
+  @Test(timeout = 5000)
+  public void testInitialize() throws Exception {
+    VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
+    CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(context);
+    TezConfiguration conf = new TezConfiguration();
+
+    // partitioned case
+    CartesianProductConfig config =
+      new CartesianProductConfig(new int[]{2,3}, new String[]{"v0", "v1"}, null);
+    when(context.getUserPayload()).thenReturn(config.toUserPayload(conf));
+    EdgeProperty edgeProperty =
+      EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null);
+    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+    edgePropertyMap.put("v0", edgeProperty);
+    edgePropertyMap.put("v1", edgeProperty);
+    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    vertexManager.initialize();
+    assertTrue(vertexManager.getVertexManagerReal()
+      instanceof CartesianProductVertexManagerPartitioned);
+
+    // unpartitioned case
+    List<String> sourceVertices = new ArrayList<>();
+    sourceVertices.add("v0");
+    sourceVertices.add("v1");
+    config = new CartesianProductConfig(sourceVertices);
+    when(context.getUserPayload()).thenReturn(config.toUserPayload(conf));
+    vertexManager.initialize();
+    assertTrue(vertexManager.getVertexManagerReal()
+      instanceof CartesianProductVertexManagerUnpartitioned);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..9aca647
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManagerPartitioned {
+  @Captor
+  private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
+  @Captor
+  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+  private TezConfiguration conf = new TezConfiguration();
+
+  @Before
+  public void init() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  public static class TestFilter extends CartesianProductFilter {
+    public TestFilter(UserPayload payload) {
+      super(payload);
+    }
+
+    @Override
+    public boolean isValidCombination(Map<String, Integer> vertexPartitionMap) {
+      return vertexPartitionMap.get("v0") > vertexPartitionMap.get("v1");
+    }
+  }
+
+  private void testReconfigureVertexHelper(CartesianProductConfig config, int parallelism)
+    throws Exception {
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+
+    EdgeProperty edgeProperty =
+      EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null);
+    Map<String, EdgeProperty> inputEdgeProperties = new HashMap<>();
+    for (String vertex : config.getSourceVertices()) {
+      inputEdgeProperties.put(vertex, edgeProperty);
+    }
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+    CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+    vertexManager.initialize();
+    ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    verify(mockContext, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals((int)parallelismCaptor.getValue(), parallelism);
+    assertNull(edgePropertiesCaptor.getValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testReconfigureVertex() throws Exception {
+    testReconfigureVertexHelper(
+      new CartesianProductConfig(new int[]{5,5}, new String[]{"v0", "v1"},
+        new CartesianProductFilterDescriptor(TestFilter.class.getName())), 10);
+    testReconfigureVertexHelper(
+      new CartesianProductConfig(new int[]{5,5}, new String[]{"v0", "v1"}, null), 25);
+  }
+
+  @Test(timeout = 5000)
+  public void testScheduling() throws Exception {
+    CartesianProductConfig config = new CartesianProductConfig(new int[]{2,2},
+      new String[]{"v0", "v1"}, null);
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+    Set<String> inputVertices = new HashSet<String>();
+    inputVertices.add("v0");
+    inputVertices.add("v1");
+    when(mockContext.getVertexInputNames()).thenReturn(inputVertices);
+    when(mockContext.getVertexNumTasks("v0")).thenReturn(4);
+    when(mockContext.getVertexNumTasks("v1")).thenReturn(4);
+    EdgeProperty edgeProperty =
+      EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null);
+    Map<String, EdgeProperty> inputEdgeProperties = new HashMap<String, EdgeProperty>();
+    inputEdgeProperties.put("v0", edgeProperty);
+    inputEdgeProperties.put("v1", edgeProperty);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+    CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+    vertexManager.initialize();
+
+    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+
+    TaskAttemptIdentifier taId = mock(TaskAttemptIdentifier.class, Mockito.RETURNS_DEEP_STUBS);
+    when(taId.getTaskIdentifier().getVertexIdentifier().getName()).thenReturn("v0", "v0", "v1",
+      "v1", "v0", "v0", "v1", "v1");
+    when(taId.getTaskIdentifier().getIdentifier()).thenReturn(0, 1, 0, 1, 2, 3, 2, 3);
+
+    for (int i = 0; i < 2; i++) {
+      vertexManager.onSourceTaskCompleted(taId);
+      verify(mockContext, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    }
+
+    List<ScheduleTaskRequest> scheduleTaskRequests;
+
+    vertexManager.onSourceTaskCompleted(taId);
+    verify(mockContext, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+    assertEquals(1, scheduleTaskRequests.size());
+    assertEquals(0, scheduleTaskRequests.get(0).getTaskIndex());
+
+    vertexManager.onSourceTaskCompleted(taId);
+    verify(mockContext, times(2)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+    assertEquals(1, scheduleTaskRequests.size());
+    assertEquals(1, scheduleTaskRequests.get(0).getTaskIndex());
+
+    vertexManager.onSourceTaskCompleted(taId);
+    verify(mockContext, times(3)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+    assertEquals(1, scheduleTaskRequests.size());
+    assertEquals(2, scheduleTaskRequests.get(0).getTaskIndex());
+
+    vertexManager.onSourceTaskCompleted(taId);
+    verify(mockContext, times(4)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+    assertEquals(1, scheduleTaskRequests.size());
+    assertEquals(3, scheduleTaskRequests.get(0).getTaskIndex());
+
+    for (int i = 0; i < 2; i++) {
+      vertexManager.onSourceTaskCompleted(taId);
+      verify(mockContext, times(4)).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexStartWithCompletion() throws Exception {
+    CartesianProductConfig config = new CartesianProductConfig(new int[]{2,2},
+      new String[]{"v0", "v1"}, null);
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+    Set<String> inputVertices = new HashSet<String>();
+    inputVertices.add("v0");
+    inputVertices.add("v1");
+    when(mockContext.getVertexInputNames()).thenReturn(inputVertices);
+    when(mockContext.getVertexNumTasks("v0")).thenReturn(4);
+    when(mockContext.getVertexNumTasks("v1")).thenReturn(4);
+    EdgeProperty edgeProperty =
+      EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null);
+    Map<String, EdgeProperty> inputEdgeProperties = new HashMap<String, EdgeProperty>();
+    inputEdgeProperties.put("v0", edgeProperty);
+    inputEdgeProperties.put("v1", edgeProperty);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+    CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+    vertexManager.initialize();
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    List<TaskAttemptIdentifier> completions = new ArrayList<>();
+    TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), 0);
+    TezVertexID v0Id = TezVertexID.getInstance(dagId, 0);
+    TezVertexID v1Id = TezVertexID.getInstance(dagId, 1);
+
+    completions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(v0Id, 0), 0)));
+    completions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(v0Id, 1), 0)));
+    completions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1Id, 0), 0)));
+
+    vertexManager.onVertexStarted(completions);
+
+    List<ScheduleTaskRequest> scheduleTaskRequests;
+    verify(mockContext, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+    assertEquals(1, scheduleTaskRequests.size());
+    assertEquals(0, scheduleTaskRequests.get(0).getTaskIndex());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
new file mode 100644
index 0000000..f76de96
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManagerUnpartitioned {
+  @Captor
+  private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
+  @Captor
+  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+  private CartesianProductVertexManagerUnpartitioned vertexManager;
+  private VertexManagerPluginContext context;
+  private List<TaskAttemptIdentifier> allCompletions;
+
+  @Before
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    context = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
+    when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
+
+    CartesianProductVertexManagerConfig config =
+      new CartesianProductVertexManagerConfig(false, new String[]{"v0","v1"}, null, 0, 0, null);
+    vertexManager.initialize(config);
+    allCompletions = new ArrayList<>();
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 0), 0), 1)));
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 1), 0), 0)));
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 1), 0), 1)));
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 1), 0), 2)));
+  }
+
+  @Test(timeout = 5000)
+  public void testReconfigureVertex() throws Exception {
+    ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    verify(context, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals(6, (int)parallelismCaptor.getValue());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    for (EdgeProperty edgeProperty : edgeProperties.values()) {
+      UserPayload payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+      CartesianProductEdgeManagerConfig newConfig =
+        CartesianProductEdgeManagerConfig.fromUserPayload(payload);
+      assertArrayEquals(new int[]{2,3}, newConfig.getNumTasks());
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testCompletionAfterReconfigured() throws Exception {
+    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    vertexManager.onSourceTaskCompleted(allCompletions.get(2));
+    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+    assertNotNull(requests);
+    assertEquals(1, requests.size());
+    assertEquals(0, requests.get(0).getTaskIndex());
+  }
+
+  @Test(timeout = 5000)
+  public void testCompletionBeforeReconfigured() throws Exception {
+    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    vertexManager.onSourceTaskCompleted(allCompletions.get(2));
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+    assertNotNull(requests);
+    assertEquals(1, requests.size());
+    assertEquals(0, requests.get(0).getTaskIndex());
+  }
+
+  @Test(timeout = 5000)
+  public void testStartAfterReconfigured() throws Exception {
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    List<TaskAttemptIdentifier> completion = new ArrayList<>();
+    completion.add(allCompletions.get(0));
+    completion.add(allCompletions.get(2));
+    vertexManager.onVertexStarted(completion);
+    verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+    assertNotNull(requests);
+    assertEquals(1, requests.size());
+    assertEquals(0, requests.get(0).getTaskIndex());
+  }
+
+  @Test(timeout = 5000)
+  public void testStartBeforeReconfigured() throws Exception {
+    vertexManager.onVertexStarted(allCompletions);
+    verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+  }
+
+  @Test(timeout = 5000)
+  public void testZeroSrcTask() throws Exception {
+    context = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
+    when(context.getVertexNumTasks(eq("v1"))).thenReturn(0);
+
+    CartesianProductVertexManagerConfig config =
+      new CartesianProductVertexManagerConfig(false, new String[]{"v0","v1"}, null, 0, 0, null);
+    vertexManager.initialize(config);
+    allCompletions = new ArrayList<>();
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
+    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+      TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+        TezDAGID.getInstance("0", 0, 0), 0), 0), 1)));
+
+    vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+    vertexManager.onSourceTaskCompleted(allCompletions.get(1));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 2d10f94..764ef0f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -19,8 +19,17 @@
 package org.apache.tez.test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -778,5 +787,68 @@ public class TestFaultTolerance {
     // dag will fail with 2 attempts failing from vertex v1
     runDAGAndVerify(dag, DAGStatus.State.FAILED, 2, "no progress");
   }
-  
+
+  /**
+   * In unpartitioned cartesian product, failure fraction should be #unique failure/#consumer that
+   * depends on the src task. Here we test a 2x2 cartesian product and let 4th destination task fail.
+   * The failure fraction limit is configured to be 0.25. So the failure fraction should be 1/2,
+   * not 1/4.
+   * @throws Exception
+   */
+  @Test
+  public void testCartesianProduct() throws Exception {
+    Configuration dagConf = new Configuration();
+    dagConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.25);
+    DAG dag = DAG.create("dag");
+
+    Configuration vertexConf = new Configuration();
+    vertexConf.setInt(TestProcessor.getVertexConfName(
+      TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), 3);
+    vertexConf.setInt(TestProcessor.getVertexConfName(
+      TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3"), 5);
+    UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(vertexConf);
+    ProcessorDescriptor processorDescriptor =
+      ProcessorDescriptor.create(TestProcessor.class.getName()).setUserPayload(vertexPayload);
+    Vertex v1 = Vertex.create("v1", processorDescriptor, 2);
+    Vertex v2 = Vertex.create("v2", processorDescriptor, 2);
+    Vertex v3 = Vertex.create("v3", processorDescriptor);
+
+    String[] sourceVertices = {"v1", "v2"};
+    CartesianProductConfig cartesianProductConfig =
+      new CartesianProductConfig(Arrays.asList(sourceVertices));
+    UserPayload cartesianProductPayload =
+      cartesianProductConfig.toUserPayload(new TezConfiguration());
+
+    v3.setVertexManagerPlugin(
+      VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+        .setUserPayload(cartesianProductPayload));
+
+    EdgeManagerPluginDescriptor edgeManagerPluginDescriptor =
+      EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName())
+        .setUserPayload(cartesianProductPayload);
+
+    Configuration inputConf = new Configuration();
+    inputConf.setBoolean(TestInput.getVertexConfName(
+      TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    inputConf.setInt(TestInput.getVertexConfName(
+      TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), 3);
+    inputConf.setInt(TestInput.getVertexConfName(
+      TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), 0);
+    inputConf.setInt(TestInput.getVertexConfName(
+      TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), 0);
+    inputConf.setInt(TestInput.getVertexConfName(
+      TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), 0);
+    UserPayload inputPayload = TezUtils.createUserPayloadFromConf(inputConf);
+    EdgeProperty edgeProperty =
+      EdgeProperty.create(edgeManagerPluginDescriptor, DataMovementType.CUSTOM,
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, TestOutput.getOutputDesc(null),
+        TestInput.getInputDesc(inputPayload));
+    Edge e1 = Edge.create(v1, v3, edgeProperty);
+    Edge e2 = Edge.create(v2, v3, edgeProperty);
+    dag.addVertex(v1).addVertex(v2).addVertex(v3);
+    dag.addEdge(e1).addEdge(e2);
+
+    // run dag
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
 }


[2/2] tez git commit: TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)

Posted by mi...@apache.org.
TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a068b23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a068b23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a068b23

Branch: refs/heads/master
Commit: 1a068b2391684563bb53a0720848b7673d8dc46c
Parents: af82469
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Sep 6 10:49:50 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Sep 6 10:49:50 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   3 +-
 .../apache/tez/examples/CartesianProduct.java   | 208 ++++++++++++++
 tez-runtime-library/findbugs-exclude.xml        |  18 ++
 tez-runtime-library/pom.xml                     |   1 +
 .../CartesianProductCombination.java            | 164 +++++++++++
 .../CartesianProductConfig.java                 | 255 +++++++++++++++++
 .../CartesianProductEdgeManager.java            | 106 +++++++
 .../CartesianProductEdgeManagerConfig.java      |  64 +++++
 .../CartesianProductEdgeManagerPartitioned.java | 124 ++++++++
 .../CartesianProductEdgeManagerReal.java        |  62 ++++
 ...artesianProductEdgeManagerUnpartitioned.java |  98 +++++++
 .../CartesianProductFilter.java                 |  47 +++
 .../CartesianProductFilterDescriptor.java       |  28 ++
 .../CartesianProductVertexManager.java          | 139 +++++++++
 .../CartesianProductVertexManagerConfig.java    |  75 +++++
 ...artesianProductVertexManagerPartitioned.java | 176 ++++++++++++
 .../CartesianProductVertexManagerReal.java      |  50 ++++
 ...tesianProductVertexManagerUnpartitioned.java | 178 ++++++++++++
 .../main/proto/CartesianProductPayload.proto    |  31 ++
 .../TestCartesianProductCombination.java        | 110 +++++++
 .../TestCartesianProductConfig.java             | 106 +++++++
 .../TestCartesianProductEdgeManager.java        |  68 +++++
 ...tCartesianProductEdgeManagerPartitioned.java | 284 +++++++++++++++++++
 ...artesianProductEdgeManagerUnpartitioned.java | 240 ++++++++++++++++
 .../TestCartesianProductVertexManager.java      |  67 +++++
 ...artesianProductVertexManagerPartitioned.java | 230 +++++++++++++++
 ...tesianProductVertexManagerUnpartitioned.java | 194 +++++++++++++
 .../org/apache/tez/test/TestFaultTolerance.java |  74 ++++-
 29 files changed, 3198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b73dd3f..0225db6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
   TEZ-3326. Display JVM system properties in AM and task logs.
   TEZ-3009. Errors that occur during container task acquisition are not logged.
   TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e39315b..e5f3e71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1705,8 +1705,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       }
       
       int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
-      boolean crossTimeDeadline = readErrorTimespanSec >=
-      MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+      boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
       float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
           / outputFailedEvent.getConsumerTaskNumber();

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
new file mode 100644
index 0000000..9f3d490
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+/**
+ * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one
+ * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions
+ * according to the parity of token's first char. Then JoinProcessor does cartesian product of
+ * partitioned token sets.
+ */
+public class CartesianProduct extends TezExampleBase {
+  private static final String INPUT = "Input1";
+  private static final String OUTPUT = "Output";
+  private static final String VERTEX1 = "Vertex1";
+  private static final String VERTEX2 = "Vertex2";
+  private static final String VERTEX3 = "Vertex3";
+  private static final String PARTITIONED = "-partitioned";
+  private static final String UNPARTITIONED = "-unpartitioned";
+  private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
+  private static final int numPartition = 2;
+  private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2};
+
+  public static class TokenProcessor extends SimpleProcessor {
+    public TokenProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
+      while (kvReader.next()) {
+        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+        while (itr.hasMoreTokens()) {
+          kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
+        }
+      }
+    }
+  }
+
+  public static class JoinProcessor extends SimpleMRProcessor {
+    public JoinProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
+      KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
+      KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
+      Set<String> rightSet = new HashSet<>();
+
+      while (kvReader2.next()) {
+        rightSet.add(kvReader2.getCurrentKey().toString());
+      }
+
+      while (kvReader1.next()) {
+        String left = kvReader1.getCurrentKey().toString();
+        for (String right : rightSet) {
+          kvWriter.write(left, right);
+        }
+      }
+    }
+  }
+
+  public static class CustomPartitioner implements Partitioner {
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      return key.toString().charAt(0) % numPartition;
+    }
+  }
+
+  private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2,
+                        String outputPath, boolean isPartitioned) throws IOException {
+    Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    // turn off groupSplit so that each input file incurs one task
+    v1.addDataSource(INPUT,
+      MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1)
+             .groupSplits(false).build());
+    Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    v2.addDataSource(INPUT,
+      MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2)
+              .groupSplits(false).build());
+    CartesianProductConfig cartesianProductConfig;
+    if (isPartitioned) {
+      Map<String, Integer> vertexPartitionMap = new HashMap<>();
+      for (String vertex : sourceVertices) {
+        vertexPartitionMap.put(vertex, numPartition);
+      }
+      cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap);
+    } else {
+      cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices));
+    }
+    UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
+    Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName()));
+    v3.addDataSink(OUTPUT,
+      MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
+              .build());
+    v3.setVertexManagerPlugin(
+      VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+                                   .setUserPayload(userPayload));
+
+    DAG dag = DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3);
+    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+      EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+    edgeManagerDescriptor.setUserPayload(userPayload);
+    EdgeProperty edgeProperty;
+    if (isPartitioned) {
+      UnorderedPartitionedKVEdgeConfig edgeConf =
+        UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),
+          CustomPartitioner.class.getName()).build();
+      edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+    } else {
+      UnorderedKVEdgeConfig edgeConf =
+        UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
+      edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+    }
+    dag.addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty));
+
+    return dag;
+  }
+
+  @Override
+  protected void printUsage() {
+    System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED
+      + " <input_dir1> <input_dir2> <output_dir>");
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    return (otherArgs.length != 4 || (!otherArgs[0].equals(PARTITIONED)
+      && !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    DAG dag = createDAG(tezConf, args[1], args[2],
+        args[3], args[0].equals(PARTITIONED));
+    return runDag(dag, isCountersLog(), LOG);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
+    System.exit(res);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 4e15edc..d3b6245 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -123,6 +123,24 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+
+  <Match>
     <Bug pattern="EI_EXPOSE_REP"/>
     <Or>
       <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9831e50..b676933 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -129,6 +129,7 @@
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
                   <include>ShufflePayloads.proto</include>
+                  <include>CartesianProductPayload.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
new file mode 100644
index 0000000..a46993d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represent the combination of source partitions or tasks.
+ *
+ * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
+ * destination tasks. The mapping from source partition/task to destination task is like this:
+ * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
+ *
+ * Basically, it stores the source partition/task combination and can compute corresponding
+ * destination task. It can also figure out the source combination from a given destination task.
+ * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
+ * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ *
+ * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
+ * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
+ *
+ * Or you can also traverse all combinations that has one specific partition with
+ * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
+ * partition.
+ */
+class CartesianProductCombination {
+  // numPartitions for partitioned case, numTasks for unpartitioned case
+  private int[] numPartitionOrTask;
+  // at which position (in source vertices array) our vertex is
+  private int positionId = -1;
+  // The i-th element Ci represents partition/task Ci of source vertex i.
+  private final Integer[] combination;
+  // the weight of each vertex when computing the task id
+  private final Integer[] factor;
+
+  public CartesianProductCombination(int[] numPartitionOrTask) {
+    this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
+    combination = new Integer[numPartitionOrTask.length];
+    factor = new Integer[numPartitionOrTask.length];
+    factor[factor.length-1] = 1;
+    for (int i = combination.length-2; i >= 0; i--) {
+      factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+    }
+  }
+
+  public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
+    this(numPartitionOrTask);
+    this.positionId = positionId;
+  }
+
+  /**
+   * @return a read only view of current combination
+   */
+  public List<Integer> getCombination() {
+    return Collections.unmodifiableList(Arrays.asList(combination));
+  }
+
+  /**
+   * first combination with given partition id in current position
+   * @param partition
+   */
+  public void firstTaskWithFixedPartition(int partition) {
+    Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+    Arrays.fill(combination, 0);
+    combination[positionId] = partition;
+  }
+
+  /**
+   * next combination without current partition in current position
+   * @return false if there is no next combination
+   */
+  public boolean nextTaskWithFixedPartition() {
+    Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+    int i;
+    for (i = combination.length-1; i >= 0; i--) {
+      if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+        break;
+      }
+    }
+
+    if (i < 0) {
+      return false;
+    }
+
+    combination[i]++;
+
+    for (i++; i < combination.length; i++) {
+      if (i != positionId) {
+        combination[i] = 0;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * first combination with given partition id in current position
+   */
+  public void firstTask() {
+    Arrays.fill(combination, 0);
+  }
+
+  /**
+   * next combination without current partition in current position
+   * @return false if there is no next combination
+   */
+  public boolean nextTask() {
+    int i;
+    for (i = combination.length-1; i >= 0; i--) {
+      if (combination[i] != numPartitionOrTask[i]-1) {
+        break;
+      }
+    }
+
+    if (i < 0) {
+      return false;
+    }
+
+    combination[i]++;
+    Arrays.fill(combination, i+1, combination.length, 0);
+    return true;
+  }
+
+  /**
+   * @return corresponding task id for current combination
+   */
+  public int getTaskId() {
+    int taskId = 0;
+    for (int i = 0; i < combination.length; i++) {
+      taskId += combination[i]*factor[i];
+    }
+    return taskId;
+  }
+
+  public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+                                                       int taskId) {
+    CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+    for (int i = 0; i < result.combination.length; i++) {
+      result.combination[i] = taskId/result.factor[i];
+      taskId %= result.factor[i];
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
new file mode 100644
index 0000000..b682182
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+/**
+ * <class>CartesianProductConfig</class> is used to configure both
+ * <class>CartesianProductVertexManager</class> and <class>CartesianProductEdgeManager</class>.
+ * User need to specify the vertices and number of partitions of each vertices' output at least.
+ * In partitioned case, filter should be specified here also(via
+ * <class>CartesianProductFilterDescriptor</class>. User may also configure min/max fractions used
+ * in slow start.
+ */
+@Evolving
+public class CartesianProductConfig {
+  private final boolean isPartitioned;
+  private final String[] sourceVertices;
+  private final int[] numPartitions;
+  private final CartesianProductFilterDescriptor filterDescriptor;
+
+  /**
+   * create config for unpartitioned case
+   * @param sourceVertices list of source vertices names
+   */
+  public CartesianProductConfig(List<String> sourceVertices) {
+    Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
+    Preconditions.checkArgument(sourceVertices.size() > 1,
+      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+
+    this.isPartitioned = false;
+    this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+    this.numPartitions = null;
+    this.filterDescriptor = null;
+  }
+
+  /**
+   * create config for partitioned case without filter
+   * @param vertexPartitionMap the map from vertex name to its number of partitions
+   */
+  public CartesianProductConfig(Map<String, Integer> vertexPartitionMap) {
+    this(vertexPartitionMap, null);
+  }
+
+  /**
+   * create config for partitioned case with filter
+   * @param vertexPartitionMap the map from vertex name to its number of partitions
+   * @param filterDescriptor
+   */
+  public CartesianProductConfig(Map<String, Integer> vertexPartitionMap,
+                                CartesianProductFilterDescriptor filterDescriptor) {
+    Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null");
+    Preconditions.checkArgument(vertexPartitionMap.size() > 1,
+      "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size());
+
+    this.isPartitioned = true;
+    this.numPartitions = new int[vertexPartitionMap.size()];
+    this.sourceVertices = new String[vertexPartitionMap.size()];
+    this.filterDescriptor = filterDescriptor;
+
+    int i = 0;
+    for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
+      this.sourceVertices[i] = entry.getKey();
+      this.numPartitions[i] = entry.getValue();
+      i++;
+    }
+
+    checkNumPartitions();
+  }
+
+  /**
+   * create config for partitioned case, with specified source vertices order
+   * @param numPartitions
+   * @param sourceVertices
+   * @param filterDescriptor
+   */
+  @VisibleForTesting
+  protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+                                   CartesianProductFilterDescriptor filterDescriptor) {
+    Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
+    Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
+    Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
+      "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
+        "(length: " + sourceVertices.length + ") cannot have different length");
+    Preconditions.checkArgument(sourceVertices.length > 1,
+      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+
+    this.isPartitioned = true;
+    this.numPartitions = numPartitions;
+    this.sourceVertices = sourceVertices;
+    this.filterDescriptor = filterDescriptor;
+
+    checkNumPartitions();
+  }
+
+  /**
+   * create config for both cases, used by subclass
+   */
+  protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
+                                   String[] sourceVertices,
+                                   CartesianProductFilterDescriptor filterDescriptor) {
+    this.isPartitioned = isPartitioned;
+    this.numPartitions = numPartitions;
+    this.sourceVertices = sourceVertices;
+    this.filterDescriptor = filterDescriptor;
+  }
+
+  @VisibleForTesting
+  protected void checkNumPartitions() {
+    if (isPartitioned) {
+      boolean isUnpartitioned = true;
+      for (int i = 0; i < numPartitions.length; i++) {
+        Preconditions.checkArgument(this.numPartitions[i] > 0,
+          "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+        isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
+      }
+      Preconditions.checkArgument(!isUnpartitioned,
+        "every source vertex has 1 partition in a partitioned case");
+    } else {
+      Preconditions.checkArgument(this.numPartitions == null,
+        "partition counts should be null in unpartitioned case");
+    }
+  }
+
+  /**
+   * @return the array of source vertices names
+   */
+  public List<String> getSourceVertices() {
+    return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+  }
+
+  /**
+   * @return the array of number of partitions, the order is same as result of
+   *         <method>getSourceVertices</method>
+   */
+  public List<Integer> getNumPartitions() {
+    if (this.numPartitions == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(Ints.asList(this.numPartitions));
+  }
+
+  public boolean getIsPartitioned() {
+    return isPartitioned;
+  }
+
+  public CartesianProductFilterDescriptor getFilterDescriptor() {
+    return this.filterDescriptor;
+  }
+
+  public UserPayload toUserPayload(TezConfiguration conf) throws IOException {
+    return UserPayload.create(ByteBuffer.wrap(toProto(conf).toByteArray()));
+  }
+
+  protected CartesianProductConfigProto toProto(TezConfiguration conf) {
+    CartesianProductConfigProto.Builder builder =
+      CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(this.isPartitioned)
+      .addAllSourceVertices(Arrays.asList(sourceVertices));
+
+    if (isPartitioned) {
+      builder.addAllNumPartitions(Ints.asList(numPartitions));
+      if (filterDescriptor != null) {
+        builder.setFilterClassName(filterDescriptor.getClassName());
+        UserPayload filterUesrPayload = filterDescriptor.getUserPayload();
+        if (filterUesrPayload != null) {
+          builder.setFilterUserPayload(ByteString.copyFrom(filterUesrPayload.getPayload()));
+        }
+      }
+    }
+
+    builder.setMinFraction(
+      CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT);
+    builder.setMaxFraction(
+      CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT);
+
+    if (conf != null) {
+      builder.setMinFraction(conf.getFloat(
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION,
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT));
+      builder.setMaxFraction(conf.getFloat(
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION,
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT));
+    }
+    Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
+      "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" +
+        builder.getMaxFraction() + ") in cartesian product slow start");
+
+    return builder.build();
+  }
+
+  protected static CartesianProductConfigProto userPayloadToProto(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    Preconditions.checkArgument(payload != null, "UserPayload is null");
+    Preconditions.checkArgument(payload.getPayload() != null, "UserPayload carreis null payload");
+    return
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+  }
+
+  protected static CartesianProductConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    return fromProto(userPayloadToProto(payload));
+  }
+
+  protected static CartesianProductConfig fromProto(
+    CartesianProductConfigProto proto) {
+    if (!proto.getIsPartitioned()) {
+      return new CartesianProductConfig(proto.getSourceVerticesList());
+    } else {
+      String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+      proto.getSourceVerticesList().toArray(sourceVertices);
+      CartesianProductFilterDescriptor filterDescriptor = null;
+      if (proto.hasFilterClassName()) {
+        filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());
+        if (proto.hasFilterUserPayload()) {
+          filterDescriptor.setUserPayload(
+            UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+        }
+      }
+      return new CartesianProductConfig(Ints.toArray(proto.getNumPartitionsList()),
+        sourceVertices, filterDescriptor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
new file mode 100644
index 0000000..96cce94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.TezException;
+
+import javax.annotation.Nullable;
+
+/**
+ * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
+  private CartesianProductEdgeManagerReal edgeManagerReal;
+
+  public CartesianProductEdgeManager(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    Preconditions.checkArgument(getContext().getUserPayload() != null);
+    CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(
+      getContext().getUserPayload());
+    // no need to check config because config comes from VM and is already checked by VM
+    edgeManagerReal = config.getIsPartitioned()
+      ? new CartesianProductEdgeManagerPartitioned(getContext())
+      : new CartesianProductEdgeManagerUnpartitioned(getContext());
+    edgeManagerReal.initialize(config);
+  }
+
+  @VisibleForTesting
+  protected CartesianProductEdgeManagerReal getEdgeManagerReal() {
+    return this.edgeManagerReal;
+  }
+
+  @Override
+  public void prepareForRouting() throws Exception {
+    edgeManagerReal.prepareForRouting();
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return edgeManagerReal.routeInputErrorEventToSource(destTaskId, failedInputId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+                                                                int srcOutputId,
+                                                                int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeDataMovementEventToDestination(srcTaskId, srcOutputId, destTaskId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeInputSourceTaskFailedEventToDestination(srcTaskId, destTaskId);
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return edgeManagerReal.getNumDestinationTaskPhysicalInputs(destTaskId);
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return edgeManagerReal.getNumSourceTaskPhysicalOutputs(srcTaskId);
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return edgeManagerReal.getNumDestinationConsumerTasks(sourceTaskIndex);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
new file mode 100644
index 0000000..d48a0bb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
+  private final int[] numTasks;
+
+  protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
+                                            int[] numPartitions, int[] numTasks,
+                                            CartesianProductFilterDescriptor filterDescriptor) {
+    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    this.numTasks = numTasks;
+  }
+
+  public int[] getNumTasks() {
+    return this.numTasks;
+  }
+
+  public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto proto =
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+    boolean isPartitioned = proto.getIsPartitioned();
+    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+    proto.getSourceVerticesList().toArray(sourceVertices);
+    int[] numPartitions =
+      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+    if (proto.hasFilterUserPayload()) {
+      filterDescriptor.setUserPayload(
+        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+    }
+    int[] numTasks =
+      proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
+    return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
+      numTasks, filterDescriptor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..644d5af
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal {
+  private int positionId;
+  private CartesianProductFilter filter;
+  private int[] taskIdMapping;
+  private CartesianProductEdgeManagerConfig config;
+  private int[] numPartitions;
+
+  public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductEdgeManagerConfig config) throws Exception {
+    this.config = config;
+    this.numPartitions = Ints.toArray(config.getNumPartitions());
+    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+    if (filterDescriptor != null) {
+      filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+        new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()});
+    }
+    generateTaskIdMapping();
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return failedInputId;
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+                                                                int destTaskId) throws Exception {
+    int partition = CartesianProductCombination.fromTaskId(numPartitions,
+      getIdealTaskId(destTaskId)).getCombination().get(positionId);
+    return srcOutputId != partition ? null :
+      EventRouteMetadata.create(1, new int[]{srcTaskId});
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int partition = CartesianProductCombination.fromTaskId(numPartitions,
+      getIdealTaskId(destTaskId)).getCombination().get(positionId);
+    return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition});
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return EventRouteMetadata.create(1, new int[]{srcTaskId});
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return getContext().getSourceVertexNumTasks();
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return numPartitions[positionId];
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return getContext().getDestinationVertexNumTasks();
+  }
+
+  private void generateTaskIdMapping() {
+    List<Integer> idealTaskId = new ArrayList<>();
+    Map<String, Integer> vertexPartitionMap = new HashMap<>();
+    CartesianProductCombination combination =
+      new CartesianProductCombination(numPartitions);
+    combination.firstTask();
+    List<String> sourceVertices = config.getSourceVertices();
+    do {
+      for (int i = 0; i < sourceVertices.size(); i++) {
+        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      }
+      if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+        idealTaskId.add(combination.getTaskId());
+      }
+    } while (combination.nextTask());
+    this.taskIdMapping = Ints.toArray(idealTaskId);
+  }
+
+  private int getIdealTaskId(int realTaskId) {
+    return taskIdMapping[realTaskId];
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
new file mode 100644
index 0000000..705db05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+
+/**
+ * base class of cartesian product edge manager implementation
+ */
+abstract class CartesianProductEdgeManagerReal {
+  private final EdgeManagerPluginContext context;
+
+  public CartesianProductEdgeManagerReal(EdgeManagerPluginContext context) {
+    this.context = context;
+  }
+
+  public EdgeManagerPluginContext getContext() {
+    return this.context;
+  }
+
+  public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception;
+
+  public void prepareForRouting() throws Exception {}
+
+  public abstract int routeInputErrorEventToSource(int destTaskId, int failedInputId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+                                                                         int srcOutputId,
+                                                                         int destTaskId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
+    throws Exception;
+
+  public abstract int getNumDestinationTaskPhysicalInputs(int destTaskId);
+
+  public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskId);
+
+  public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..cea4142
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+import static org.apache.tez.dag.api.EdgeManagerPluginOnDemand.*;
+
+class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
+  private int positionId;
+  private int[] numTasks;
+  private int numDestinationConsumerTasks;
+
+  public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  public void initialize(CartesianProductEdgeManagerConfig config) {
+    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+    this.numTasks = config.getNumTasks();
+
+    if (numTasks != null && numTasks[positionId] != 0) {
+      numDestinationConsumerTasks = 1;
+      for (int numTask : numTasks) {
+        numDestinationConsumerTasks *= numTask;
+      }
+      numDestinationConsumerTasks /= numTasks[positionId];
+    }
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return
+      CartesianProductCombination.fromTaskId(numTasks, destTaskId).getCombination().get(positionId);
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+                                                                int destTaskId) throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+      .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+        .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null;
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+      .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return 1;
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return 1;
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return numDestinationConsumerTasks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
new file mode 100644
index 0000000..5b6456e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.UserPayload;
+
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * User can extend this base class and override <method>isValidCombination</method> to implement
+ * custom filter
+ */
+@Evolving
+public abstract class CartesianProductFilter {
+  private UserPayload userPayload;
+
+  public CartesianProductFilter(UserPayload payload) {
+    this.userPayload = payload;
+  }
+
+  /**
+   * @param vertexPartitionMap the map from vertex name to partition id
+   * @return whether this combination of partitions is valid
+   */
+  public abstract boolean isValidCombination(Map<String, Integer> vertexPartitionMap);
+
+  public UserPayload getUserPayload() {
+    return userPayload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
new file mode 100644
index 0000000..bc81755
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EntityDescriptor;
+
+public class CartesianProductFilterDescriptor
+  extends EntityDescriptor<CartesianProductFilterDescriptor> {
+
+  public CartesianProductFilterDescriptor(String filterClassName) {
+    super(filterClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
new file mode 100644
index 0000000..659d3b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductVertexManager extends VertexManagerPlugin {
+  public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION =
+    "tez.cartesian-product.min-src-fraction";
+  public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT = 0.25f;
+  public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION =
+    "tez.cartesian-product.min-src-fraction";
+  public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f;
+
+  private CartesianProductVertexManagerReal vertexManagerReal = null;
+
+  public CartesianProductVertexManager(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    CartesianProductVertexManagerConfig config =
+      CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload());
+    // check whether DAG and config are is consistent
+    Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
+    Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
+    Set<String> sourceVerticesConfig = new HashSet<>();
+    sourceVerticesConfig.addAll(config.getSourceVertices());
+
+    for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
+      if (entry.getValue().getEdgeManagerDescriptor().getClassName()
+        .equals(CartesianProductEdgeManager.class.getName())) {
+        Preconditions.checkArgument(sourceVerticesDAG.contains(entry.getKey()),
+          entry.getKey() + " has CartesianProductEdgeManager but isn't in " +
+            "CartesianProductVertexManagerConfig");
+      } else {
+        Preconditions.checkArgument(!sourceVerticesDAG.contains(entry.getKey()),
+          entry.getKey() + " has no CartesianProductEdgeManager but is in " +
+            "CartesianProductVertexManagerConfig");
+      }
+    }
+
+    for (String vertex : sourceVerticesConfig) {
+      Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
+        vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+      Preconditions.checkArgument(
+        edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
+          .equals(CartesianProductEdgeManager.class.getName()),
+        vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+          "CartesianProductEdgeManager");
+    }
+
+    vertexManagerReal = config.getIsPartitioned()
+      ? new CartesianProductVertexManagerPartitioned(getContext())
+      : new CartesianProductVertexManagerUnpartitioned(getContext());
+    vertexManagerReal.initialize(config);
+  }
+
+  @VisibleForTesting
+  protected CartesianProductVertexManagerReal getVertexManagerReal() {
+    return this.vertexManagerReal;
+  }
+
+  /**
+   * no op currently, will be used for locality based optimization in future
+   * @param vmEvent
+   * @throws Exception
+   */
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+
+    vertexManagerReal.onVertexManagerEventReceived(vmEvent);
+  }
+
+  /**
+   * Currently direct input to cartesian product vertex is not supported
+   * @param inputName
+   * @param inputDescriptor
+   * @param events
+   * @throws Exception
+   */
+  @Override
+  public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+                                      List<Event> events) throws Exception {
+    throw new TezException("Direct input to cartesian product vertex is not supported yet");
+  }
+
+  @Override
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+    vertexManagerReal.onVertexStarted(completions);
+  }
+
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception{
+    vertexManagerReal.onVertexStateUpdated(stateUpdate);
+  }
+
+  @Override
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    vertexManagerReal.onSourceTaskCompleted(attempt);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
new file mode 100644
index 0000000..b324524
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
+class CartesianProductVertexManagerConfig extends CartesianProductConfig {
+  private final float minFraction;
+  private final float maxFraction;
+
+  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+                                             int[] numPartitions,
+                                             float minFraction, float maxFraction,
+                                             CartesianProductFilterDescriptor filterDescriptor) {
+    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    Preconditions.checkArgument(minFraction <= maxFraction,
+      "min fraction(" + minFraction + ") should be less than max fraction(" +
+        maxFraction  + ") in cartesian product slow start");
+    this.minFraction = minFraction;
+    this.maxFraction = maxFraction;
+  }
+
+  public float getMinFraction() {
+    return minFraction;
+  }
+
+  public float getMaxFraction() {
+    return maxFraction;
+  }
+
+  public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto proto =
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+    boolean isPartitioned = proto.getIsPartitioned();
+    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+    proto.getSourceVerticesList().toArray(sourceVertices);
+    int[] numPartitions =
+      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+    if (proto.hasFilterUserPayload()) {
+      filterDescriptor.setUserPayload(
+        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+    }
+    float minFraction = proto.getMinFraction();
+    float maxFraction = proto.getMaxFraction();
+    return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+      minFraction, maxFraction, filterDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..af2abae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Starts scheduling tasks when number of completed source tasks crosses
+ * min fraction and schedules all task when max fraction is reached
+ */
+class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal {
+  private CartesianProductVertexManagerConfig config;
+  private List<String> sourceVertices;
+  private int parallelism = 0;
+  private boolean vertexStarted = false;
+  private boolean vertexReconfigured = false;
+  private int numSourceVertexConfigured = 0;
+  private CartesianProductFilter filter;
+  private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+  private int numFinishedSrcTasks = 0;
+  private int totalNumSrcTasks = 0;
+  private int lastScheduledTaskId = -1;
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CartesianProductVertexManagerPartitioned.class);
+
+  public CartesianProductVertexManagerPartitioned(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException {
+    this.config = config;
+    this.sourceVertices = config.getSourceVertices();
+    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+    if (filterDescriptor != null) {
+      try {
+        filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+          new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()});
+      } catch (TezReflectionException e) {
+        LOG.error("Creating filter failed");
+        throw e;
+      }
+    }
+    for (String sourceVertex : sourceVertices) {
+      sourceTaskCompleted.put(sourceVertex, new BitSet());
+    }
+    for (String vertex : sourceVertices) {
+      getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+    }
+    getContext().vertexReconfigurationPlanned();
+  }
+
+  private void reconfigureVertex() throws IOException {
+    // try all combinations, check against filter and get final parallelism
+    Map<String, Integer> vertexPartitionMap = new HashMap<>();
+
+    CartesianProductCombination combination =
+      new CartesianProductCombination(Ints.toArray(config.getNumPartitions()));
+    combination.firstTask();
+    do {
+      for (int i = 0; i < sourceVertices.size(); i++) {
+        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      }
+      if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+        parallelism++;
+      }
+    } while (combination.nextTask());
+    // no need to reconfigure EM because EM already has all necessary information via config object
+    getContext().reconfigureVertex(parallelism, null, null);
+    vertexReconfigured = true;
+    getContext().doneReconfiguringVertex();
+  }
+
+  @Override
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+    throws Exception {
+    vertexStarted = true;
+    if (completions != null) {
+      for (TaskAttemptIdentifier attempt : completions) {
+        onSourceTaskCompleted(attempt);
+      }
+    }
+    // try schedule because there may be no more vertex state update and source completions
+    tryScheduleTask();
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException{
+    Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+    if (!vertexReconfigured) {
+      reconfigureVertex();
+    }
+    numSourceVertexConfigured++;
+    totalNumSrcTasks += getContext().getVertexNumTasks(stateUpdate.getVertexName());
+    // try schedule because there may be no more vertex start and source completions
+    tryScheduleTask();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
+    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    BitSet bitSet = this.sourceTaskCompleted.get(vertex);
+    if (!bitSet.get(taskId)) {
+      bitSet.set(taskId);
+      numFinishedSrcTasks++;
+      tryScheduleTask();
+    }
+  }
+
+  /**
+   * schedule task as the ascending order of id. Slow start has same behavior as ShuffleVertexManager
+   */
+  private void tryScheduleTask() {
+    // only schedule task when vertex is already started and all source vertices are configured
+    if (!vertexStarted
+      || numSourceVertexConfigured != sourceVertices.size()) {
+      return;
+    }
+    // determine the destination task with largest id to schedule
+    float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
+    int numTaskToSchedule;
+    if (percentFinishedSrcTask < config.getMinFraction()) {
+      numTaskToSchedule = 0;
+    } else if (config.getMinFraction() <= percentFinishedSrcTask &&
+        percentFinishedSrcTask <= config.getMaxFraction()) {
+      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
+        /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+    } else {
+      numTaskToSchedule = parallelism;
+    }
+    // schedule tasks if there are more we can schedule
+    if (numTaskToSchedule-1 > lastScheduledTaskId) {
+      List<ScheduleTaskRequest> scheduleTaskRequests = new ArrayList<>();
+      for (int i = lastScheduledTaskId + 1; i < numTaskToSchedule; i++) {
+        scheduleTaskRequests.add(ScheduleTaskRequest.create(i, null));
+      }
+      lastScheduledTaskId = numTaskToSchedule-1;
+      getContext().scheduleTasks(scheduleTaskRequests);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
new file mode 100644
index 0000000..84e65ac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.List;
+
+/**
+ * base class of cartesian product vertex manager implementation
+ */
+abstract class CartesianProductVertexManagerReal {
+  private final VertexManagerPluginContext context;
+
+  public CartesianProductVertexManagerReal(VertexManagerPluginContext context) {
+    this.context = context;
+  }
+
+  public final VertexManagerPluginContext getContext() {
+    return this.context;
+  }
+
+  public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception;
+
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+  public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception;
+
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+  public abstract void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception;
+}