You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/06 17:51:15 UTC

[2/2] tez git commit: TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)

TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a068b23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a068b23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a068b23

Branch: refs/heads/master
Commit: 1a068b2391684563bb53a0720848b7673d8dc46c
Parents: af82469
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Sep 6 10:49:50 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Sep 6 10:49:50 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   3 +-
 .../apache/tez/examples/CartesianProduct.java   | 208 ++++++++++++++
 tez-runtime-library/findbugs-exclude.xml        |  18 ++
 tez-runtime-library/pom.xml                     |   1 +
 .../CartesianProductCombination.java            | 164 +++++++++++
 .../CartesianProductConfig.java                 | 255 +++++++++++++++++
 .../CartesianProductEdgeManager.java            | 106 +++++++
 .../CartesianProductEdgeManagerConfig.java      |  64 +++++
 .../CartesianProductEdgeManagerPartitioned.java | 124 ++++++++
 .../CartesianProductEdgeManagerReal.java        |  62 ++++
 ...artesianProductEdgeManagerUnpartitioned.java |  98 +++++++
 .../CartesianProductFilter.java                 |  47 +++
 .../CartesianProductFilterDescriptor.java       |  28 ++
 .../CartesianProductVertexManager.java          | 139 +++++++++
 .../CartesianProductVertexManagerConfig.java    |  75 +++++
 ...artesianProductVertexManagerPartitioned.java | 176 ++++++++++++
 .../CartesianProductVertexManagerReal.java      |  50 ++++
 ...tesianProductVertexManagerUnpartitioned.java | 178 ++++++++++++
 .../main/proto/CartesianProductPayload.proto    |  31 ++
 .../TestCartesianProductCombination.java        | 110 +++++++
 .../TestCartesianProductConfig.java             | 106 +++++++
 .../TestCartesianProductEdgeManager.java        |  68 +++++
 ...tCartesianProductEdgeManagerPartitioned.java | 284 +++++++++++++++++++
 ...artesianProductEdgeManagerUnpartitioned.java | 240 ++++++++++++++++
 .../TestCartesianProductVertexManager.java      |  67 +++++
 ...artesianProductVertexManagerPartitioned.java | 230 +++++++++++++++
 ...tesianProductVertexManagerUnpartitioned.java | 194 +++++++++++++
 .../org/apache/tez/test/TestFaultTolerance.java |  74 ++++-
 29 files changed, 3198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b73dd3f..0225db6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
   TEZ-3326. Display JVM system properties in AM and task logs.
   TEZ-3009. Errors that occur during container task acquisition are not logged.
   TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e39315b..e5f3e71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1705,8 +1705,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       }
       
       int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
-      boolean crossTimeDeadline = readErrorTimespanSec >=
-      MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+      boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
       float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
           / outputFailedEvent.getConsumerTaskNumber();

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
new file mode 100644
index 0000000..9f3d490
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+/**
+ * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one
+ * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions
+ * according to the parity of token's first char. Then JoinProcessor does cartesian product of
+ * partitioned token sets.
+ */
+public class CartesianProduct extends TezExampleBase {
+  private static final String INPUT = "Input1";
+  private static final String OUTPUT = "Output";
+  private static final String VERTEX1 = "Vertex1";
+  private static final String VERTEX2 = "Vertex2";
+  private static final String VERTEX3 = "Vertex3";
+  private static final String PARTITIONED = "-partitioned";
+  private static final String UNPARTITIONED = "-unpartitioned";
+  private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
+  private static final int numPartition = 2;
+  private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2};
+
+  public static class TokenProcessor extends SimpleProcessor {
+    public TokenProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
+      while (kvReader.next()) {
+        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+        while (itr.hasMoreTokens()) {
+          kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
+        }
+      }
+    }
+  }
+
+  public static class JoinProcessor extends SimpleMRProcessor {
+    public JoinProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
+      KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
+      KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
+      Set<String> rightSet = new HashSet<>();
+
+      while (kvReader2.next()) {
+        rightSet.add(kvReader2.getCurrentKey().toString());
+      }
+
+      while (kvReader1.next()) {
+        String left = kvReader1.getCurrentKey().toString();
+        for (String right : rightSet) {
+          kvWriter.write(left, right);
+        }
+      }
+    }
+  }
+
+  public static class CustomPartitioner implements Partitioner {
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      return key.toString().charAt(0) % numPartition;
+    }
+  }
+
+  private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2,
+                        String outputPath, boolean isPartitioned) throws IOException {
+    Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    // turn off groupSplit so that each input file incurs one task
+    v1.addDataSource(INPUT,
+      MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1)
+             .groupSplits(false).build());
+    Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+    v2.addDataSource(INPUT,
+      MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2)
+              .groupSplits(false).build());
+    CartesianProductConfig cartesianProductConfig;
+    if (isPartitioned) {
+      Map<String, Integer> vertexPartitionMap = new HashMap<>();
+      for (String vertex : sourceVertices) {
+        vertexPartitionMap.put(vertex, numPartition);
+      }
+      cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap);
+    } else {
+      cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices));
+    }
+    UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
+    Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName()));
+    v3.addDataSink(OUTPUT,
+      MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
+              .build());
+    v3.setVertexManagerPlugin(
+      VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+                                   .setUserPayload(userPayload));
+
+    DAG dag = DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3);
+    EdgeManagerPluginDescriptor edgeManagerDescriptor =
+      EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+    edgeManagerDescriptor.setUserPayload(userPayload);
+    EdgeProperty edgeProperty;
+    if (isPartitioned) {
+      UnorderedPartitionedKVEdgeConfig edgeConf =
+        UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),
+          CustomPartitioner.class.getName()).build();
+      edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+    } else {
+      UnorderedKVEdgeConfig edgeConf =
+        UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
+      edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+    }
+    dag.addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty));
+
+    return dag;
+  }
+
+  @Override
+  protected void printUsage() {
+    System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED
+      + " <input_dir1> <input_dir2> <output_dir>");
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    return (otherArgs.length != 4 || (!otherArgs[0].equals(PARTITIONED)
+      && !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    DAG dag = createDAG(tezConf, args[1], args[2],
+        args[3], args[0].equals(PARTITIONED));
+    return runDag(dag, isCountersLog(), LOG);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
+    System.exit(res);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 4e15edc..d3b6245 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -123,6 +123,24 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+
+  <Match>
     <Bug pattern="EI_EXPOSE_REP"/>
     <Or>
       <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9831e50..b676933 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -129,6 +129,7 @@
                 <directory>${basedir}/src/main/proto</directory>
                 <includes>
                   <include>ShufflePayloads.proto</include>
+                  <include>CartesianProductPayload.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
new file mode 100644
index 0000000..a46993d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represent the combination of source partitions or tasks.
+ *
+ * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
+ * destination tasks. The mapping from source partition/task to destination task is like this:
+ * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
+ *
+ * Basically, it stores the source partition/task combination and can compute corresponding
+ * destination task. It can also figure out the source combination from a given destination task.
+ * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
+ * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ *
+ * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
+ * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
+ *
+ * Or you can also traverse all combinations that has one specific partition with
+ * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
+ * partition.
+ */
+class CartesianProductCombination {
+  // numPartitions for partitioned case, numTasks for unpartitioned case
+  private int[] numPartitionOrTask;
+  // at which position (in source vertices array) our vertex is
+  private int positionId = -1;
+  // The i-th element Ci represents partition/task Ci of source vertex i.
+  private final Integer[] combination;
+  // the weight of each vertex when computing the task id
+  private final Integer[] factor;
+
+  public CartesianProductCombination(int[] numPartitionOrTask) {
+    this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
+    combination = new Integer[numPartitionOrTask.length];
+    factor = new Integer[numPartitionOrTask.length];
+    factor[factor.length-1] = 1;
+    for (int i = combination.length-2; i >= 0; i--) {
+      factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+    }
+  }
+
+  public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
+    this(numPartitionOrTask);
+    this.positionId = positionId;
+  }
+
+  /**
+   * @return a read only view of current combination
+   */
+  public List<Integer> getCombination() {
+    return Collections.unmodifiableList(Arrays.asList(combination));
+  }
+
+  /**
+   * first combination with given partition id in current position
+   * @param partition
+   */
+  public void firstTaskWithFixedPartition(int partition) {
+    Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+    Arrays.fill(combination, 0);
+    combination[positionId] = partition;
+  }
+
+  /**
+   * next combination without current partition in current position
+   * @return false if there is no next combination
+   */
+  public boolean nextTaskWithFixedPartition() {
+    Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+    int i;
+    for (i = combination.length-1; i >= 0; i--) {
+      if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+        break;
+      }
+    }
+
+    if (i < 0) {
+      return false;
+    }
+
+    combination[i]++;
+
+    for (i++; i < combination.length; i++) {
+      if (i != positionId) {
+        combination[i] = 0;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * first combination with given partition id in current position
+   */
+  public void firstTask() {
+    Arrays.fill(combination, 0);
+  }
+
+  /**
+   * next combination without current partition in current position
+   * @return false if there is no next combination
+   */
+  public boolean nextTask() {
+    int i;
+    for (i = combination.length-1; i >= 0; i--) {
+      if (combination[i] != numPartitionOrTask[i]-1) {
+        break;
+      }
+    }
+
+    if (i < 0) {
+      return false;
+    }
+
+    combination[i]++;
+    Arrays.fill(combination, i+1, combination.length, 0);
+    return true;
+  }
+
+  /**
+   * @return corresponding task id for current combination
+   */
+  public int getTaskId() {
+    int taskId = 0;
+    for (int i = 0; i < combination.length; i++) {
+      taskId += combination[i]*factor[i];
+    }
+    return taskId;
+  }
+
+  public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+                                                       int taskId) {
+    CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+    for (int i = 0; i < result.combination.length; i++) {
+      result.combination[i] = taskId/result.factor[i];
+      taskId %= result.factor[i];
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
new file mode 100644
index 0000000..b682182
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+/**
+ * <class>CartesianProductConfig</class> is used to configure both
+ * <class>CartesianProductVertexManager</class> and <class>CartesianProductEdgeManager</class>.
+ * User need to specify the vertices and number of partitions of each vertices' output at least.
+ * In partitioned case, filter should be specified here also(via
+ * <class>CartesianProductFilterDescriptor</class>. User may also configure min/max fractions used
+ * in slow start.
+ */
+@Evolving
+public class CartesianProductConfig {
+  private final boolean isPartitioned;
+  private final String[] sourceVertices;
+  private final int[] numPartitions;
+  private final CartesianProductFilterDescriptor filterDescriptor;
+
+  /**
+   * create config for unpartitioned case
+   * @param sourceVertices list of source vertices names
+   */
+  public CartesianProductConfig(List<String> sourceVertices) {
+    Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
+    Preconditions.checkArgument(sourceVertices.size() > 1,
+      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+
+    this.isPartitioned = false;
+    this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+    this.numPartitions = null;
+    this.filterDescriptor = null;
+  }
+
+  /**
+   * create config for partitioned case without filter
+   * @param vertexPartitionMap the map from vertex name to its number of partitions
+   */
+  public CartesianProductConfig(Map<String, Integer> vertexPartitionMap) {
+    this(vertexPartitionMap, null);
+  }
+
+  /**
+   * create config for partitioned case with filter
+   * @param vertexPartitionMap the map from vertex name to its number of partitions
+   * @param filterDescriptor
+   */
+  public CartesianProductConfig(Map<String, Integer> vertexPartitionMap,
+                                CartesianProductFilterDescriptor filterDescriptor) {
+    Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null");
+    Preconditions.checkArgument(vertexPartitionMap.size() > 1,
+      "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size());
+
+    this.isPartitioned = true;
+    this.numPartitions = new int[vertexPartitionMap.size()];
+    this.sourceVertices = new String[vertexPartitionMap.size()];
+    this.filterDescriptor = filterDescriptor;
+
+    int i = 0;
+    for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
+      this.sourceVertices[i] = entry.getKey();
+      this.numPartitions[i] = entry.getValue();
+      i++;
+    }
+
+    checkNumPartitions();
+  }
+
+  /**
+   * create config for partitioned case, with specified source vertices order
+   * @param numPartitions
+   * @param sourceVertices
+   * @param filterDescriptor
+   */
+  @VisibleForTesting
+  protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+                                   CartesianProductFilterDescriptor filterDescriptor) {
+    Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
+    Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
+    Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
+      "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
+        "(length: " + sourceVertices.length + ") cannot have different length");
+    Preconditions.checkArgument(sourceVertices.length > 1,
+      "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+
+    this.isPartitioned = true;
+    this.numPartitions = numPartitions;
+    this.sourceVertices = sourceVertices;
+    this.filterDescriptor = filterDescriptor;
+
+    checkNumPartitions();
+  }
+
+  /**
+   * create config for both cases, used by subclass
+   */
+  protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
+                                   String[] sourceVertices,
+                                   CartesianProductFilterDescriptor filterDescriptor) {
+    this.isPartitioned = isPartitioned;
+    this.numPartitions = numPartitions;
+    this.sourceVertices = sourceVertices;
+    this.filterDescriptor = filterDescriptor;
+  }
+
+  @VisibleForTesting
+  protected void checkNumPartitions() {
+    if (isPartitioned) {
+      boolean isUnpartitioned = true;
+      for (int i = 0; i < numPartitions.length; i++) {
+        Preconditions.checkArgument(this.numPartitions[i] > 0,
+          "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+        isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
+      }
+      Preconditions.checkArgument(!isUnpartitioned,
+        "every source vertex has 1 partition in a partitioned case");
+    } else {
+      Preconditions.checkArgument(this.numPartitions == null,
+        "partition counts should be null in unpartitioned case");
+    }
+  }
+
+  /**
+   * @return the array of source vertices names
+   */
+  public List<String> getSourceVertices() {
+    return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+  }
+
+  /**
+   * @return the array of number of partitions, the order is same as result of
+   *         <method>getSourceVertices</method>
+   */
+  public List<Integer> getNumPartitions() {
+    if (this.numPartitions == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(Ints.asList(this.numPartitions));
+  }
+
+  public boolean getIsPartitioned() {
+    return isPartitioned;
+  }
+
+  public CartesianProductFilterDescriptor getFilterDescriptor() {
+    return this.filterDescriptor;
+  }
+
+  public UserPayload toUserPayload(TezConfiguration conf) throws IOException {
+    return UserPayload.create(ByteBuffer.wrap(toProto(conf).toByteArray()));
+  }
+
+  protected CartesianProductConfigProto toProto(TezConfiguration conf) {
+    CartesianProductConfigProto.Builder builder =
+      CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(this.isPartitioned)
+      .addAllSourceVertices(Arrays.asList(sourceVertices));
+
+    if (isPartitioned) {
+      builder.addAllNumPartitions(Ints.asList(numPartitions));
+      if (filterDescriptor != null) {
+        builder.setFilterClassName(filterDescriptor.getClassName());
+        UserPayload filterUesrPayload = filterDescriptor.getUserPayload();
+        if (filterUesrPayload != null) {
+          builder.setFilterUserPayload(ByteString.copyFrom(filterUesrPayload.getPayload()));
+        }
+      }
+    }
+
+    builder.setMinFraction(
+      CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT);
+    builder.setMaxFraction(
+      CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT);
+
+    if (conf != null) {
+      builder.setMinFraction(conf.getFloat(
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION,
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT));
+      builder.setMaxFraction(conf.getFloat(
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION,
+        CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT));
+    }
+    Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
+      "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" +
+        builder.getMaxFraction() + ") in cartesian product slow start");
+
+    return builder.build();
+  }
+
+  protected static CartesianProductConfigProto userPayloadToProto(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    Preconditions.checkArgument(payload != null, "UserPayload is null");
+    Preconditions.checkArgument(payload.getPayload() != null, "UserPayload carreis null payload");
+    return
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+  }
+
+  protected static CartesianProductConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    return fromProto(userPayloadToProto(payload));
+  }
+
+  protected static CartesianProductConfig fromProto(
+    CartesianProductConfigProto proto) {
+    if (!proto.getIsPartitioned()) {
+      return new CartesianProductConfig(proto.getSourceVerticesList());
+    } else {
+      String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+      proto.getSourceVerticesList().toArray(sourceVertices);
+      CartesianProductFilterDescriptor filterDescriptor = null;
+      if (proto.hasFilterClassName()) {
+        filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());
+        if (proto.hasFilterUserPayload()) {
+          filterDescriptor.setUserPayload(
+            UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+        }
+      }
+      return new CartesianProductConfig(Ints.toArray(proto.getNumPartitionsList()),
+        sourceVertices, filterDescriptor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
new file mode 100644
index 0000000..96cce94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.TezException;
+
+import javax.annotation.Nullable;
+
+/**
+ * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
+  private CartesianProductEdgeManagerReal edgeManagerReal;
+
+  public CartesianProductEdgeManager(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    Preconditions.checkArgument(getContext().getUserPayload() != null);
+    CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(
+      getContext().getUserPayload());
+    // no need to check config because config comes from VM and is already checked by VM
+    edgeManagerReal = config.getIsPartitioned()
+      ? new CartesianProductEdgeManagerPartitioned(getContext())
+      : new CartesianProductEdgeManagerUnpartitioned(getContext());
+    edgeManagerReal.initialize(config);
+  }
+
+  @VisibleForTesting
+  protected CartesianProductEdgeManagerReal getEdgeManagerReal() {
+    return this.edgeManagerReal;
+  }
+
+  @Override
+  public void prepareForRouting() throws Exception {
+    edgeManagerReal.prepareForRouting();
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return edgeManagerReal.routeInputErrorEventToSource(destTaskId, failedInputId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+                                                                int srcOutputId,
+                                                                int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeDataMovementEventToDestination(srcTaskId, srcOutputId, destTaskId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId);
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return edgeManagerReal.routeInputSourceTaskFailedEventToDestination(srcTaskId, destTaskId);
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return edgeManagerReal.getNumDestinationTaskPhysicalInputs(destTaskId);
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return edgeManagerReal.getNumSourceTaskPhysicalOutputs(srcTaskId);
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return edgeManagerReal.getNumDestinationConsumerTasks(sourceTaskIndex);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
new file mode 100644
index 0000000..d48a0bb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
+  private final int[] numTasks;
+
+  protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
+                                            int[] numPartitions, int[] numTasks,
+                                            CartesianProductFilterDescriptor filterDescriptor) {
+    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    this.numTasks = numTasks;
+  }
+
+  public int[] getNumTasks() {
+    return this.numTasks;
+  }
+
+  public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto proto =
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+    boolean isPartitioned = proto.getIsPartitioned();
+    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+    proto.getSourceVerticesList().toArray(sourceVertices);
+    int[] numPartitions =
+      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+    if (proto.hasFilterUserPayload()) {
+      filterDescriptor.setUserPayload(
+        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+    }
+    int[] numTasks =
+      proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
+    return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
+      numTasks, filterDescriptor);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..644d5af
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal {
+  private int positionId;
+  private CartesianProductFilter filter;
+  private int[] taskIdMapping;
+  private CartesianProductEdgeManagerConfig config;
+  private int[] numPartitions;
+
+  public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductEdgeManagerConfig config) throws Exception {
+    this.config = config;
+    this.numPartitions = Ints.toArray(config.getNumPartitions());
+    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+    if (filterDescriptor != null) {
+      filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+        new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()});
+    }
+    generateTaskIdMapping();
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return failedInputId;
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+                                                                int destTaskId) throws Exception {
+    int partition = CartesianProductCombination.fromTaskId(numPartitions,
+      getIdealTaskId(destTaskId)).getCombination().get(positionId);
+    return srcOutputId != partition ? null :
+      EventRouteMetadata.create(1, new int[]{srcTaskId});
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int partition = CartesianProductCombination.fromTaskId(numPartitions,
+      getIdealTaskId(destTaskId)).getCombination().get(positionId);
+    return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition});
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    return EventRouteMetadata.create(1, new int[]{srcTaskId});
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return getContext().getSourceVertexNumTasks();
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return numPartitions[positionId];
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return getContext().getDestinationVertexNumTasks();
+  }
+
+  private void generateTaskIdMapping() {
+    List<Integer> idealTaskId = new ArrayList<>();
+    Map<String, Integer> vertexPartitionMap = new HashMap<>();
+    CartesianProductCombination combination =
+      new CartesianProductCombination(numPartitions);
+    combination.firstTask();
+    List<String> sourceVertices = config.getSourceVertices();
+    do {
+      for (int i = 0; i < sourceVertices.size(); i++) {
+        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      }
+      if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+        idealTaskId.add(combination.getTaskId());
+      }
+    } while (combination.nextTask());
+    this.taskIdMapping = Ints.toArray(idealTaskId);
+  }
+
+  private int getIdealTaskId(int realTaskId) {
+    return taskIdMapping[realTaskId];
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
new file mode 100644
index 0000000..705db05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+
+/**
+ * base class of cartesian product edge manager implementation
+ */
+abstract class CartesianProductEdgeManagerReal {
+  private final EdgeManagerPluginContext context;
+
+  public CartesianProductEdgeManagerReal(EdgeManagerPluginContext context) {
+    this.context = context;
+  }
+
+  public EdgeManagerPluginContext getContext() {
+    return this.context;
+  }
+
+  public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception;
+
+  public void prepareForRouting() throws Exception {}
+
+  public abstract int routeInputErrorEventToSource(int destTaskId, int failedInputId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+                                                                         int srcOutputId,
+                                                                         int destTaskId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
+    throws Exception;
+
+  public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                                  int destTaskId)
+    throws Exception;
+
+  public abstract int getNumDestinationTaskPhysicalInputs(int destTaskId);
+
+  public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskId);
+
+  public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..cea4142
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+import static org.apache.tez.dag.api.EdgeManagerPluginOnDemand.*;
+
+class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
+  private int positionId;
+  private int[] numTasks;
+  private int numDestinationConsumerTasks;
+
+  public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  public void initialize(CartesianProductEdgeManagerConfig config) {
+    positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+    this.numTasks = config.getNumTasks();
+
+    if (numTasks != null && numTasks[positionId] != 0) {
+      numDestinationConsumerTasks = 1;
+      for (int numTask : numTasks) {
+        numDestinationConsumerTasks *= numTask;
+      }
+      numDestinationConsumerTasks /= numTasks[positionId];
+    }
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+    return
+      CartesianProductCombination.fromTaskId(numTasks, destTaskId).getCombination().get(positionId);
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+                                                                int destTaskId) throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+      .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+        .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null;
+  }
+
+  @Nullable
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+                                                                         int destTaskId)
+    throws Exception {
+    int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+      .getCombination().get(positionId);
+    return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+    return 1;
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+    return 1;
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return numDestinationConsumerTasks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
new file mode 100644
index 0000000..5b6456e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.UserPayload;
+
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * User can extend this base class and override <method>isValidCombination</method> to implement
+ * custom filter
+ */
+@Evolving
+public abstract class CartesianProductFilter {
+  private UserPayload userPayload;
+
+  public CartesianProductFilter(UserPayload payload) {
+    this.userPayload = payload;
+  }
+
+  /**
+   * @param vertexPartitionMap the map from vertex name to partition id
+   * @return whether this combination of partitions is valid
+   */
+  public abstract boolean isValidCombination(Map<String, Integer> vertexPartitionMap);
+
+  public UserPayload getUserPayload() {
+    return userPayload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
new file mode 100644
index 0000000..bc81755
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EntityDescriptor;
+
+public class CartesianProductFilterDescriptor
+  extends EntityDescriptor<CartesianProductFilterDescriptor> {
+
+  public CartesianProductFilterDescriptor(String filterClassName) {
+    super(filterClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
new file mode 100644
index 0000000..659d3b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductVertexManager extends VertexManagerPlugin {
+  public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION =
+    "tez.cartesian-product.min-src-fraction";
+  public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT = 0.25f;
+  public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION =
+    "tez.cartesian-product.min-src-fraction";
+  public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f;
+
+  private CartesianProductVertexManagerReal vertexManagerReal = null;
+
+  public CartesianProductVertexManager(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+    CartesianProductVertexManagerConfig config =
+      CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload());
+    // check whether DAG and config are is consistent
+    Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
+    Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
+    Set<String> sourceVerticesConfig = new HashSet<>();
+    sourceVerticesConfig.addAll(config.getSourceVertices());
+
+    for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
+      if (entry.getValue().getEdgeManagerDescriptor().getClassName()
+        .equals(CartesianProductEdgeManager.class.getName())) {
+        Preconditions.checkArgument(sourceVerticesDAG.contains(entry.getKey()),
+          entry.getKey() + " has CartesianProductEdgeManager but isn't in " +
+            "CartesianProductVertexManagerConfig");
+      } else {
+        Preconditions.checkArgument(!sourceVerticesDAG.contains(entry.getKey()),
+          entry.getKey() + " has no CartesianProductEdgeManager but is in " +
+            "CartesianProductVertexManagerConfig");
+      }
+    }
+
+    for (String vertex : sourceVerticesConfig) {
+      Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
+        vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+      Preconditions.checkArgument(
+        edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
+          .equals(CartesianProductEdgeManager.class.getName()),
+        vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+          "CartesianProductEdgeManager");
+    }
+
+    vertexManagerReal = config.getIsPartitioned()
+      ? new CartesianProductVertexManagerPartitioned(getContext())
+      : new CartesianProductVertexManagerUnpartitioned(getContext());
+    vertexManagerReal.initialize(config);
+  }
+
+  @VisibleForTesting
+  protected CartesianProductVertexManagerReal getVertexManagerReal() {
+    return this.vertexManagerReal;
+  }
+
+  /**
+   * no op currently, will be used for locality based optimization in future
+   * @param vmEvent
+   * @throws Exception
+   */
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+
+    vertexManagerReal.onVertexManagerEventReceived(vmEvent);
+  }
+
+  /**
+   * Currently direct input to cartesian product vertex is not supported
+   * @param inputName
+   * @param inputDescriptor
+   * @param events
+   * @throws Exception
+   */
+  @Override
+  public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+                                      List<Event> events) throws Exception {
+    throw new TezException("Direct input to cartesian product vertex is not supported yet");
+  }
+
+  @Override
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+    vertexManagerReal.onVertexStarted(completions);
+  }
+
+  @Override
+  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception{
+    vertexManagerReal.onVertexStateUpdated(stateUpdate);
+  }
+
+  @Override
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    vertexManagerReal.onSourceTaskCompleted(attempt);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
new file mode 100644
index 0000000..b324524
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
+class CartesianProductVertexManagerConfig extends CartesianProductConfig {
+  private final float minFraction;
+  private final float maxFraction;
+
+  public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+                                             int[] numPartitions,
+                                             float minFraction, float maxFraction,
+                                             CartesianProductFilterDescriptor filterDescriptor) {
+    super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+    Preconditions.checkArgument(minFraction <= maxFraction,
+      "min fraction(" + minFraction + ") should be less than max fraction(" +
+        maxFraction  + ") in cartesian product slow start");
+    this.minFraction = minFraction;
+    this.maxFraction = maxFraction;
+  }
+
+  public float getMinFraction() {
+    return minFraction;
+  }
+
+  public float getMaxFraction() {
+    return maxFraction;
+  }
+
+  public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
+    throws InvalidProtocolBufferException {
+    CartesianProductConfigProto proto =
+      CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+    boolean isPartitioned = proto.getIsPartitioned();
+    String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+    proto.getSourceVerticesList().toArray(sourceVertices);
+    int[] numPartitions =
+      proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+    CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+      ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+    if (proto.hasFilterUserPayload()) {
+      filterDescriptor.setUserPayload(
+        UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+    }
+    float minFraction = proto.getMinFraction();
+    float maxFraction = proto.getMaxFraction();
+    return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+      minFraction, maxFraction, filterDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..af2abae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Starts scheduling tasks when number of completed source tasks crosses
+ * min fraction and schedules all task when max fraction is reached
+ */
+class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal {
+  private CartesianProductVertexManagerConfig config;
+  private List<String> sourceVertices;
+  private int parallelism = 0;
+  private boolean vertexStarted = false;
+  private boolean vertexReconfigured = false;
+  private int numSourceVertexConfigured = 0;
+  private CartesianProductFilter filter;
+  private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+  private int numFinishedSrcTasks = 0;
+  private int totalNumSrcTasks = 0;
+  private int lastScheduledTaskId = -1;
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CartesianProductVertexManagerPartitioned.class);
+
+  public CartesianProductVertexManagerPartitioned(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException {
+    this.config = config;
+    this.sourceVertices = config.getSourceVertices();
+    CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+    if (filterDescriptor != null) {
+      try {
+        filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+          new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()});
+      } catch (TezReflectionException e) {
+        LOG.error("Creating filter failed");
+        throw e;
+      }
+    }
+    for (String sourceVertex : sourceVertices) {
+      sourceTaskCompleted.put(sourceVertex, new BitSet());
+    }
+    for (String vertex : sourceVertices) {
+      getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+    }
+    getContext().vertexReconfigurationPlanned();
+  }
+
+  private void reconfigureVertex() throws IOException {
+    // try all combinations, check against filter and get final parallelism
+    Map<String, Integer> vertexPartitionMap = new HashMap<>();
+
+    CartesianProductCombination combination =
+      new CartesianProductCombination(Ints.toArray(config.getNumPartitions()));
+    combination.firstTask();
+    do {
+      for (int i = 0; i < sourceVertices.size(); i++) {
+        vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+      }
+      if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+        parallelism++;
+      }
+    } while (combination.nextTask());
+    // no need to reconfigure EM because EM already has all necessary information via config object
+    getContext().reconfigureVertex(parallelism, null, null);
+    vertexReconfigured = true;
+    getContext().doneReconfiguringVertex();
+  }
+
+  @Override
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+    throws Exception {
+    vertexStarted = true;
+    if (completions != null) {
+      for (TaskAttemptIdentifier attempt : completions) {
+        onSourceTaskCompleted(attempt);
+      }
+    }
+    // try schedule because there may be no more vertex state update and source completions
+    tryScheduleTask();
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException{
+    Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+    if (!vertexReconfigured) {
+      reconfigureVertex();
+    }
+    numSourceVertexConfigured++;
+    totalNumSrcTasks += getContext().getVertexNumTasks(stateUpdate.getVertexName());
+    // try schedule because there may be no more vertex start and source completions
+    tryScheduleTask();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
+    String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    BitSet bitSet = this.sourceTaskCompleted.get(vertex);
+    if (!bitSet.get(taskId)) {
+      bitSet.set(taskId);
+      numFinishedSrcTasks++;
+      tryScheduleTask();
+    }
+  }
+
+  /**
+   * schedule task as the ascending order of id. Slow start has same behavior as ShuffleVertexManager
+   */
+  private void tryScheduleTask() {
+    // only schedule task when vertex is already started and all source vertices are configured
+    if (!vertexStarted
+      || numSourceVertexConfigured != sourceVertices.size()) {
+      return;
+    }
+    // determine the destination task with largest id to schedule
+    float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
+    int numTaskToSchedule;
+    if (percentFinishedSrcTask < config.getMinFraction()) {
+      numTaskToSchedule = 0;
+    } else if (config.getMinFraction() <= percentFinishedSrcTask &&
+        percentFinishedSrcTask <= config.getMaxFraction()) {
+      numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
+        /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+    } else {
+      numTaskToSchedule = parallelism;
+    }
+    // schedule tasks if there are more we can schedule
+    if (numTaskToSchedule-1 > lastScheduledTaskId) {
+      List<ScheduleTaskRequest> scheduleTaskRequests = new ArrayList<>();
+      for (int i = lastScheduledTaskId + 1; i < numTaskToSchedule; i++) {
+        scheduleTaskRequests.add(ScheduleTaskRequest.create(i, null));
+      }
+      lastScheduledTaskId = numTaskToSchedule-1;
+      getContext().scheduleTasks(scheduleTaskRequests);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
new file mode 100644
index 0000000..84e65ac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.List;
+
+/**
+ * base class of cartesian product vertex manager implementation
+ */
+abstract class CartesianProductVertexManagerReal {
+  private final VertexManagerPluginContext context;
+
+  public CartesianProductVertexManagerReal(VertexManagerPluginContext context) {
+    this.context = context;
+  }
+
+  public final VertexManagerPluginContext getContext() {
+    return this.context;
+  }
+
+  public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception;
+
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+  public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception;
+
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+  public abstract void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception;
+}