You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/06 17:51:15 UTC
[2/2] tez git commit: TEZ-3230. Implement vertex manager and edge
manager of cartesian product edge. (Zhiyuan Yang via mingma)
TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a068b23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a068b23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a068b23
Branch: refs/heads/master
Commit: 1a068b2391684563bb53a0720848b7673d8dc46c
Parents: af82469
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Sep 6 10:49:50 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Sep 6 10:49:50 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +-
.../apache/tez/examples/CartesianProduct.java | 208 ++++++++++++++
tez-runtime-library/findbugs-exclude.xml | 18 ++
tez-runtime-library/pom.xml | 1 +
.../CartesianProductCombination.java | 164 +++++++++++
.../CartesianProductConfig.java | 255 +++++++++++++++++
.../CartesianProductEdgeManager.java | 106 +++++++
.../CartesianProductEdgeManagerConfig.java | 64 +++++
.../CartesianProductEdgeManagerPartitioned.java | 124 ++++++++
.../CartesianProductEdgeManagerReal.java | 62 ++++
...artesianProductEdgeManagerUnpartitioned.java | 98 +++++++
.../CartesianProductFilter.java | 47 +++
.../CartesianProductFilterDescriptor.java | 28 ++
.../CartesianProductVertexManager.java | 139 +++++++++
.../CartesianProductVertexManagerConfig.java | 75 +++++
...artesianProductVertexManagerPartitioned.java | 176 ++++++++++++
.../CartesianProductVertexManagerReal.java | 50 ++++
...tesianProductVertexManagerUnpartitioned.java | 178 ++++++++++++
.../main/proto/CartesianProductPayload.proto | 31 ++
.../TestCartesianProductCombination.java | 110 +++++++
.../TestCartesianProductConfig.java | 106 +++++++
.../TestCartesianProductEdgeManager.java | 68 +++++
...tCartesianProductEdgeManagerPartitioned.java | 284 +++++++++++++++++++
...artesianProductEdgeManagerUnpartitioned.java | 240 ++++++++++++++++
.../TestCartesianProductVertexManager.java | 67 +++++
...artesianProductVertexManagerPartitioned.java | 230 +++++++++++++++
...tesianProductVertexManagerUnpartitioned.java | 194 +++++++++++++
.../org/apache/tez/test/TestFaultTolerance.java | 74 ++++-
29 files changed, 3198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b73dd3f..0225db6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
TEZ-3326. Display JVM system properties in AM and task logs.
TEZ-3009. Errors that occur during container task acquisition are not logged.
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e39315b..e5f3e71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1705,8 +1705,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
- boolean crossTimeDeadline = readErrorTimespanSec >=
- MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+ boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
/ outputFailedEvent.getConsumerTaskNumber();
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
new file mode 100644
index 0000000..9f3d490
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+/**
+ * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one
+ * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions
+ * according to the parity of token's first char. Then JoinProcessor does cartesian product of
+ * partitioned token sets.
+ */
+public class CartesianProduct extends TezExampleBase {
+ private static final String INPUT = "Input1";
+ private static final String OUTPUT = "Output";
+ private static final String VERTEX1 = "Vertex1";
+ private static final String VERTEX2 = "Vertex2";
+ private static final String VERTEX3 = "Vertex3";
+ private static final String PARTITIONED = "-partitioned";
+ private static final String UNPARTITIONED = "-unpartitioned";
+ private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
+ private static final int numPartition = 2;
+ private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2};
+
+ public static class TokenProcessor extends SimpleProcessor {
+ public TokenProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 1);
+ Preconditions.checkArgument(getOutputs().size() == 1);
+ KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+ KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
+ while (kvReader.next()) {
+ StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+ while (itr.hasMoreTokens()) {
+ kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
+ }
+ }
+ }
+ }
+
+ public static class JoinProcessor extends SimpleMRProcessor {
+ public JoinProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
+ KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
+ KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
+ Set<String> rightSet = new HashSet<>();
+
+ while (kvReader2.next()) {
+ rightSet.add(kvReader2.getCurrentKey().toString());
+ }
+
+ while (kvReader1.next()) {
+ String left = kvReader1.getCurrentKey().toString();
+ for (String right : rightSet) {
+ kvWriter.write(left, right);
+ }
+ }
+ }
+ }
+
+ public static class CustomPartitioner implements Partitioner {
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return key.toString().charAt(0) % numPartition;
+ }
+ }
+
+ private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2,
+ String outputPath, boolean isPartitioned) throws IOException {
+ Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+ // turn off groupSplit so that each input file incurs one task
+ v1.addDataSource(INPUT,
+ MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1)
+ .groupSplits(false).build());
+ Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+ v2.addDataSource(INPUT,
+ MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2)
+ .groupSplits(false).build());
+ CartesianProductConfig cartesianProductConfig;
+ if (isPartitioned) {
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+ for (String vertex : sourceVertices) {
+ vertexPartitionMap.put(vertex, numPartition);
+ }
+ cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap);
+ } else {
+ cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices));
+ }
+ UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
+ Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName()));
+ v3.addDataSink(OUTPUT,
+ MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
+ .build());
+ v3.setVertexManagerPlugin(
+ VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+ .setUserPayload(userPayload));
+
+ DAG dag = DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3);
+ EdgeManagerPluginDescriptor edgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+ edgeManagerDescriptor.setUserPayload(userPayload);
+ EdgeProperty edgeProperty;
+ if (isPartitioned) {
+ UnorderedPartitionedKVEdgeConfig edgeConf =
+ UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),
+ CustomPartitioner.class.getName()).build();
+ edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+ } else {
+ UnorderedKVEdgeConfig edgeConf =
+ UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
+ edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+ }
+ dag.addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty));
+
+ return dag;
+ }
+
+ @Override
+ protected void printUsage() {
+ System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED
+ + " <input_dir1> <input_dir2> <output_dir>");
+ }
+
+ @Override
+ protected int validateArgs(String[] otherArgs) {
+ return (otherArgs.length != 4 || (!otherArgs[0].equals(PARTITIONED)
+ && !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0;
+ }
+
+ @Override
+ protected int runJob(String[] args, TezConfiguration tezConf,
+ TezClient tezClient) throws Exception {
+ DAG dag = createDAG(tezConf, args[1], args[2],
+ args[3], args[0].equals(PARTITIONED));
+ return runDag(dag, isCountersLog(), LOG);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
+ System.exit(res);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 4e15edc..d3b6245 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -123,6 +123,24 @@
</Match>
<Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+ <Field name="unknownFields"/>
+ <Bug pattern="SE_BAD_FIELD"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+ <Field name="PARSER"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto$Builder"/>
+ <Method name="maybeForceBuilderInitialization"/>
+ <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+ </Match>
+
+ <Match>
<Bug pattern="EI_EXPOSE_REP"/>
<Or>
<Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9831e50..b676933 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -129,6 +129,7 @@
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>ShufflePayloads.proto</include>
+ <include>CartesianProductPayload.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
new file mode 100644
index 0000000..a46993d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represent the combination of source partitions or tasks.
+ *
+ * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
+ * destination tasks. The mapping from source partition/task to destination task is like this:
+ * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
+ *
+ * Basically, it stores the source partition/task combination and can compute corresponding
+ * destination task. It can also figure out the source combination from a given destination task.
+ * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
+ * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ *
+ * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
+ * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
+ *
+ * Or you can also traverse all combinations that has one specific partition with
+ * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
+ * partition.
+ */
+class CartesianProductCombination {
+ // numPartitions for partitioned case, numTasks for unpartitioned case
+ private int[] numPartitionOrTask;
+ // at which position (in source vertices array) our vertex is
+ private int positionId = -1;
+ // The i-th element Ci represents partition/task Ci of source vertex i.
+ private final Integer[] combination;
+ // the weight of each vertex when computing the task id
+ private final Integer[] factor;
+
+ public CartesianProductCombination(int[] numPartitionOrTask) {
+ this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
+ combination = new Integer[numPartitionOrTask.length];
+ factor = new Integer[numPartitionOrTask.length];
+ factor[factor.length-1] = 1;
+ for (int i = combination.length-2; i >= 0; i--) {
+ factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+ }
+ }
+
+ public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
+ this(numPartitionOrTask);
+ this.positionId = positionId;
+ }
+
+ /**
+ * @return a read only view of current combination
+ */
+ public List<Integer> getCombination() {
+ return Collections.unmodifiableList(Arrays.asList(combination));
+ }
+
+ /**
+ * first combination with given partition id in current position
+ * @param partition
+ */
+ public void firstTaskWithFixedPartition(int partition) {
+ Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+ Arrays.fill(combination, 0);
+ combination[positionId] = partition;
+ }
+
+ /**
+ * next combination without current partition in current position
+ * @return false if there is no next combination
+ */
+ public boolean nextTaskWithFixedPartition() {
+ Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+ int i;
+ for (i = combination.length-1; i >= 0; i--) {
+ if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ combination[i]++;
+
+ for (i++; i < combination.length; i++) {
+ if (i != positionId) {
+ combination[i] = 0;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * first combination with given partition id in current position
+ */
+ public void firstTask() {
+ Arrays.fill(combination, 0);
+ }
+
+ /**
+ * next combination without current partition in current position
+ * @return false if there is no next combination
+ */
+ public boolean nextTask() {
+ int i;
+ for (i = combination.length-1; i >= 0; i--) {
+ if (combination[i] != numPartitionOrTask[i]-1) {
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ combination[i]++;
+ Arrays.fill(combination, i+1, combination.length, 0);
+ return true;
+ }
+
+ /**
+ * @return corresponding task id for current combination
+ */
+ public int getTaskId() {
+ int taskId = 0;
+ for (int i = 0; i < combination.length; i++) {
+ taskId += combination[i]*factor[i];
+ }
+ return taskId;
+ }
+
+ public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+ int taskId) {
+ CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+ for (int i = 0; i < result.combination.length; i++) {
+ result.combination[i] = taskId/result.factor[i];
+ taskId %= result.factor[i];
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
new file mode 100644
index 0000000..b682182
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+/**
+ * <class>CartesianProductConfig</class> is used to configure both
+ * <class>CartesianProductVertexManager</class> and <class>CartesianProductEdgeManager</class>.
+ * User need to specify the vertices and number of partitions of each vertices' output at least.
+ * In partitioned case, filter should be specified here also(via
+ * <class>CartesianProductFilterDescriptor</class>. User may also configure min/max fractions used
+ * in slow start.
+ */
+@Evolving
+public class CartesianProductConfig {
+ private final boolean isPartitioned;
+ private final String[] sourceVertices;
+ private final int[] numPartitions;
+ private final CartesianProductFilterDescriptor filterDescriptor;
+
+ /**
+ * create config for unpartitioned case
+ * @param sourceVertices list of source vertices names
+ */
+ public CartesianProductConfig(List<String> sourceVertices) {
+ Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
+ Preconditions.checkArgument(sourceVertices.size() > 1,
+ "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+
+ this.isPartitioned = false;
+ this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+ this.numPartitions = null;
+ this.filterDescriptor = null;
+ }
+
+ /**
+ * create config for partitioned case without filter
+ * @param vertexPartitionMap the map from vertex name to its number of partitions
+ */
+ public CartesianProductConfig(Map<String, Integer> vertexPartitionMap) {
+ this(vertexPartitionMap, null);
+ }
+
+ /**
+ * create config for partitioned case with filter
+ * @param vertexPartitionMap the map from vertex name to its number of partitions
+ * @param filterDescriptor
+ */
+ public CartesianProductConfig(Map<String, Integer> vertexPartitionMap,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null");
+ Preconditions.checkArgument(vertexPartitionMap.size() > 1,
+ "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size());
+
+ this.isPartitioned = true;
+ this.numPartitions = new int[vertexPartitionMap.size()];
+ this.sourceVertices = new String[vertexPartitionMap.size()];
+ this.filterDescriptor = filterDescriptor;
+
+ int i = 0;
+ for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
+ this.sourceVertices[i] = entry.getKey();
+ this.numPartitions[i] = entry.getValue();
+ i++;
+ }
+
+ checkNumPartitions();
+ }
+
+ /**
+ * create config for partitioned case, with specified source vertices order
+ * @param numPartitions
+ * @param sourceVertices
+ * @param filterDescriptor
+ */
+ @VisibleForTesting
+ protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
+ Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
+ Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
+ "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
+ "(length: " + sourceVertices.length + ") cannot have different length");
+ Preconditions.checkArgument(sourceVertices.length > 1,
+ "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+
+ this.isPartitioned = true;
+ this.numPartitions = numPartitions;
+ this.sourceVertices = sourceVertices;
+ this.filterDescriptor = filterDescriptor;
+
+ checkNumPartitions();
+ }
+
+ /**
+ * create config for both cases, used by subclass
+ */
+ protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
+ String[] sourceVertices,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ this.isPartitioned = isPartitioned;
+ this.numPartitions = numPartitions;
+ this.sourceVertices = sourceVertices;
+ this.filterDescriptor = filterDescriptor;
+ }
+
+ @VisibleForTesting
+ protected void checkNumPartitions() {
+ if (isPartitioned) {
+ boolean isUnpartitioned = true;
+ for (int i = 0; i < numPartitions.length; i++) {
+ Preconditions.checkArgument(this.numPartitions[i] > 0,
+ "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+ isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
+ }
+ Preconditions.checkArgument(!isUnpartitioned,
+ "every source vertex has 1 partition in a partitioned case");
+ } else {
+ Preconditions.checkArgument(this.numPartitions == null,
+ "partition counts should be null in unpartitioned case");
+ }
+ }
+
+ /**
+ * @return the array of source vertices names
+ */
+ public List<String> getSourceVertices() {
+ return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+ }
+
+ /**
+ * @return the array of number of partitions, the order is same as result of
+ * <method>getSourceVertices</method>
+ */
+ public List<Integer> getNumPartitions() {
+ if (this.numPartitions == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(Ints.asList(this.numPartitions));
+ }
+
+ public boolean getIsPartitioned() {
+ return isPartitioned;
+ }
+
+ public CartesianProductFilterDescriptor getFilterDescriptor() {
+ return this.filterDescriptor;
+ }
+
+ public UserPayload toUserPayload(TezConfiguration conf) throws IOException {
+ return UserPayload.create(ByteBuffer.wrap(toProto(conf).toByteArray()));
+ }
+
+ protected CartesianProductConfigProto toProto(TezConfiguration conf) {
+ CartesianProductConfigProto.Builder builder =
+ CartesianProductConfigProto.newBuilder();
+ builder.setIsPartitioned(this.isPartitioned)
+ .addAllSourceVertices(Arrays.asList(sourceVertices));
+
+ if (isPartitioned) {
+ builder.addAllNumPartitions(Ints.asList(numPartitions));
+ if (filterDescriptor != null) {
+ builder.setFilterClassName(filterDescriptor.getClassName());
+ UserPayload filterUesrPayload = filterDescriptor.getUserPayload();
+ if (filterUesrPayload != null) {
+ builder.setFilterUserPayload(ByteString.copyFrom(filterUesrPayload.getPayload()));
+ }
+ }
+ }
+
+ builder.setMinFraction(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT);
+ builder.setMaxFraction(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT);
+
+ if (conf != null) {
+ builder.setMinFraction(conf.getFloat(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION,
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT));
+ builder.setMaxFraction(conf.getFloat(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION,
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT));
+ }
+ Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
+ "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" +
+ builder.getMaxFraction() + ") in cartesian product slow start");
+
+ return builder.build();
+ }
+
+ protected static CartesianProductConfigProto userPayloadToProto(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ Preconditions.checkArgument(payload != null, "UserPayload is null");
+ Preconditions.checkArgument(payload.getPayload() != null, "UserPayload carreis null payload");
+ return
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+ }
+
+ protected static CartesianProductConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ return fromProto(userPayloadToProto(payload));
+ }
+
+ protected static CartesianProductConfig fromProto(
+ CartesianProductConfigProto proto) {
+ if (!proto.getIsPartitioned()) {
+ return new CartesianProductConfig(proto.getSourceVerticesList());
+ } else {
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ CartesianProductFilterDescriptor filterDescriptor = null;
+ if (proto.hasFilterClassName()) {
+ filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ }
+ return new CartesianProductConfig(Ints.toArray(proto.getNumPartitionsList()),
+ sourceVertices, filterDescriptor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
new file mode 100644
index 0000000..96cce94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.TezException;
+
+import javax.annotation.Nullable;
+
+/**
+ * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
+ private CartesianProductEdgeManagerReal edgeManagerReal;
+
+ public CartesianProductEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ Preconditions.checkArgument(getContext().getUserPayload() != null);
+ CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(
+ getContext().getUserPayload());
+ // no need to check config because config comes from VM and is already checked by VM
+ edgeManagerReal = config.getIsPartitioned()
+ ? new CartesianProductEdgeManagerPartitioned(getContext())
+ : new CartesianProductEdgeManagerUnpartitioned(getContext());
+ edgeManagerReal.initialize(config);
+ }
+
+ @VisibleForTesting
+ protected CartesianProductEdgeManagerReal getEdgeManagerReal() {
+ return this.edgeManagerReal;
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ edgeManagerReal.prepareForRouting();
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return edgeManagerReal.routeInputErrorEventToSource(destTaskId, failedInputId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+ int srcOutputId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeDataMovementEventToDestination(srcTaskId, srcOutputId, destTaskId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeInputSourceTaskFailedEventToDestination(srcTaskId, destTaskId);
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return edgeManagerReal.getNumDestinationTaskPhysicalInputs(destTaskId);
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return edgeManagerReal.getNumSourceTaskPhysicalOutputs(srcTaskId);
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return edgeManagerReal.getNumDestinationConsumerTasks(sourceTaskIndex);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
new file mode 100644
index 0000000..d48a0bb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
+ private final int[] numTasks;
+
+ protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
+ int[] numPartitions, int[] numTasks,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+ this.numTasks = numTasks;
+ }
+
+ public int[] getNumTasks() {
+ return this.numTasks;
+ }
+
+ public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ CartesianProductConfigProto proto =
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+ boolean isPartitioned = proto.getIsPartitioned();
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ int[] numPartitions =
+ proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+ CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+ ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ int[] numTasks =
+ proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
+ return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
+ numTasks, filterDescriptor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..644d5af
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal {
+ private int positionId;
+ private CartesianProductFilter filter;
+ private int[] taskIdMapping;
+ private CartesianProductEdgeManagerConfig config;
+ private int[] numPartitions;
+
+ public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize(CartesianProductEdgeManagerConfig config) throws Exception {
+ this.config = config;
+ this.numPartitions = Ints.toArray(config.getNumPartitions());
+ positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+ CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+ if (filterDescriptor != null) {
+ filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+ new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()});
+ }
+ generateTaskIdMapping();
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return failedInputId;
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+ int destTaskId) throws Exception {
+ int partition = CartesianProductCombination.fromTaskId(numPartitions,
+ getIdealTaskId(destTaskId)).getCombination().get(positionId);
+ return srcOutputId != partition ? null :
+ EventRouteMetadata.create(1, new int[]{srcTaskId});
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int partition = CartesianProductCombination.fromTaskId(numPartitions,
+ getIdealTaskId(destTaskId)).getCombination().get(positionId);
+ return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition});
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return EventRouteMetadata.create(1, new int[]{srcTaskId});
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return getContext().getSourceVertexNumTasks();
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return numPartitions[positionId];
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return getContext().getDestinationVertexNumTasks();
+ }
+
+ private void generateTaskIdMapping() {
+ List<Integer> idealTaskId = new ArrayList<>();
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+ CartesianProductCombination combination =
+ new CartesianProductCombination(numPartitions);
+ combination.firstTask();
+ List<String> sourceVertices = config.getSourceVertices();
+ do {
+ for (int i = 0; i < sourceVertices.size(); i++) {
+ vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+ }
+ if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+ idealTaskId.add(combination.getTaskId());
+ }
+ } while (combination.nextTask());
+ this.taskIdMapping = Ints.toArray(idealTaskId);
+ }
+
+ private int getIdealTaskId(int realTaskId) {
+ return taskIdMapping[realTaskId];
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
new file mode 100644
index 0000000..705db05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+
+/**
+ * base class of cartesian product edge manager implementation
+ */
+abstract class CartesianProductEdgeManagerReal {
+ private final EdgeManagerPluginContext context;
+
+ public CartesianProductEdgeManagerReal(EdgeManagerPluginContext context) {
+ this.context = context;
+ }
+
+ public EdgeManagerPluginContext getContext() {
+ return this.context;
+ }
+
+ public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception;
+
+ public void prepareForRouting() throws Exception {}
+
+ public abstract int routeInputErrorEventToSource(int destTaskId, int failedInputId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+ int srcOutputId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract int getNumDestinationTaskPhysicalInputs(int destTaskId);
+
+ public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskId);
+
+ public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..cea4142
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+import static org.apache.tez.dag.api.EdgeManagerPluginOnDemand.*;
+
+class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
+ private int positionId;
+ private int[] numTasks;
+ private int numDestinationConsumerTasks;
+
+ public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ public void initialize(CartesianProductEdgeManagerConfig config) {
+ positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+ this.numTasks = config.getNumTasks();
+
+ if (numTasks != null && numTasks[positionId] != 0) {
+ numDestinationConsumerTasks = 1;
+ for (int numTask : numTasks) {
+ numDestinationConsumerTasks *= numTask;
+ }
+ numDestinationConsumerTasks /= numTasks[positionId];
+ }
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return
+ CartesianProductCombination.fromTaskId(numTasks, destTaskId).getCombination().get(positionId);
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+ int destTaskId) throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return 1;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return numDestinationConsumerTasks;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
new file mode 100644
index 0000000..5b6456e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.UserPayload;
+
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * User can extend this base class and override <method>isValidCombination</method> to implement
+ * custom filter
+ */
+@Evolving
+public abstract class CartesianProductFilter {
+ private UserPayload userPayload;
+
+ public CartesianProductFilter(UserPayload payload) {
+ this.userPayload = payload;
+ }
+
+ /**
+ * @param vertexPartitionMap the map from vertex name to partition id
+ * @return whether this combination of partitions is valid
+ */
+ public abstract boolean isValidCombination(Map<String, Integer> vertexPartitionMap);
+
+ public UserPayload getUserPayload() {
+ return userPayload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
new file mode 100644
index 0000000..bc81755
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EntityDescriptor;
+
+public class CartesianProductFilterDescriptor
+ extends EntityDescriptor<CartesianProductFilterDescriptor> {
+
+ public CartesianProductFilterDescriptor(String filterClassName) {
+ super(filterClassName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
new file mode 100644
index 0000000..659d3b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductVertexManager extends VertexManagerPlugin {
+ public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION =
+ "tez.cartesian-product.min-src-fraction";
+ public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT = 0.25f;
+ public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION =
+ "tez.cartesian-product.min-src-fraction";
+ public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f;
+
+ private CartesianProductVertexManagerReal vertexManagerReal = null;
+
+ public CartesianProductVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ CartesianProductVertexManagerConfig config =
+ CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload());
+ // check whether DAG and config are is consistent
+ Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
+ Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
+ Set<String> sourceVerticesConfig = new HashSet<>();
+ sourceVerticesConfig.addAll(config.getSourceVertices());
+
+ for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
+ if (entry.getValue().getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName())) {
+ Preconditions.checkArgument(sourceVerticesDAG.contains(entry.getKey()),
+ entry.getKey() + " has CartesianProductEdgeManager but isn't in " +
+ "CartesianProductVertexManagerConfig");
+ } else {
+ Preconditions.checkArgument(!sourceVerticesDAG.contains(entry.getKey()),
+ entry.getKey() + " has no CartesianProductEdgeManager but is in " +
+ "CartesianProductVertexManagerConfig");
+ }
+ }
+
+ for (String vertex : sourceVerticesConfig) {
+ Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
+ vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+ Preconditions.checkArgument(
+ edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName()),
+ vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+ "CartesianProductEdgeManager");
+ }
+
+ vertexManagerReal = config.getIsPartitioned()
+ ? new CartesianProductVertexManagerPartitioned(getContext())
+ : new CartesianProductVertexManagerUnpartitioned(getContext());
+ vertexManagerReal.initialize(config);
+ }
+
+ @VisibleForTesting
+ protected CartesianProductVertexManagerReal getVertexManagerReal() {
+ return this.vertexManagerReal;
+ }
+
+ /**
+ * no op currently, will be used for locality based optimization in future
+ * @param vmEvent
+ * @throws Exception
+ */
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+
+ vertexManagerReal.onVertexManagerEventReceived(vmEvent);
+ }
+
+ /**
+ * Currently direct input to cartesian product vertex is not supported
+ * @param inputName
+ * @param inputDescriptor
+ * @param events
+ * @throws Exception
+ */
+ @Override
+ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+ List<Event> events) throws Exception {
+ throw new TezException("Direct input to cartesian product vertex is not supported yet");
+ }
+
+ @Override
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+ vertexManagerReal.onVertexStarted(completions);
+ }
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception{
+ vertexManagerReal.onVertexStateUpdated(stateUpdate);
+ }
+
+ @Override
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ vertexManagerReal.onSourceTaskCompleted(attempt);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
new file mode 100644
index 0000000..b324524
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
+class CartesianProductVertexManagerConfig extends CartesianProductConfig {
+ private final float minFraction;
+ private final float maxFraction;
+
+ public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+ int[] numPartitions,
+ float minFraction, float maxFraction,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+ Preconditions.checkArgument(minFraction <= maxFraction,
+ "min fraction(" + minFraction + ") should be less than max fraction(" +
+ maxFraction + ") in cartesian product slow start");
+ this.minFraction = minFraction;
+ this.maxFraction = maxFraction;
+ }
+
+ public float getMinFraction() {
+ return minFraction;
+ }
+
+ public float getMaxFraction() {
+ return maxFraction;
+ }
+
+ public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ CartesianProductConfigProto proto =
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+ boolean isPartitioned = proto.getIsPartitioned();
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ int[] numPartitions =
+ proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+ CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+ ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ float minFraction = proto.getMinFraction();
+ float maxFraction = proto.getMaxFraction();
+ return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+ minFraction, maxFraction, filterDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..af2abae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Starts scheduling tasks when number of completed source tasks crosses
+ * min fraction and schedules all task when max fraction is reached
+ */
+class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal {
+ private CartesianProductVertexManagerConfig config;
+ private List<String> sourceVertices;
+ private int parallelism = 0;
+ private boolean vertexStarted = false;
+ private boolean vertexReconfigured = false;
+ private int numSourceVertexConfigured = 0;
+ private CartesianProductFilter filter;
+ private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+ private int numFinishedSrcTasks = 0;
+ private int totalNumSrcTasks = 0;
+ private int lastScheduledTaskId = -1;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CartesianProductVertexManagerPartitioned.class);
+
+ public CartesianProductVertexManagerPartitioned(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException {
+ this.config = config;
+ this.sourceVertices = config.getSourceVertices();
+ CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+ if (filterDescriptor != null) {
+ try {
+ filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+ new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()});
+ } catch (TezReflectionException e) {
+ LOG.error("Creating filter failed");
+ throw e;
+ }
+ }
+ for (String sourceVertex : sourceVertices) {
+ sourceTaskCompleted.put(sourceVertex, new BitSet());
+ }
+ for (String vertex : sourceVertices) {
+ getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+ }
+ getContext().vertexReconfigurationPlanned();
+ }
+
+ private void reconfigureVertex() throws IOException {
+ // try all combinations, check against filter and get final parallelism
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+
+ CartesianProductCombination combination =
+ new CartesianProductCombination(Ints.toArray(config.getNumPartitions()));
+ combination.firstTask();
+ do {
+ for (int i = 0; i < sourceVertices.size(); i++) {
+ vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+ }
+ if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+ parallelism++;
+ }
+ } while (combination.nextTask());
+ // no need to reconfigure EM because EM already has all necessary information via config object
+ getContext().reconfigureVertex(parallelism, null, null);
+ vertexReconfigured = true;
+ getContext().doneReconfiguringVertex();
+ }
+
+ @Override
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+ throws Exception {
+ vertexStarted = true;
+ if (completions != null) {
+ for (TaskAttemptIdentifier attempt : completions) {
+ onSourceTaskCompleted(attempt);
+ }
+ }
+ // try schedule because there may be no more vertex state update and source completions
+ tryScheduleTask();
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException{
+ Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+ if (!vertexReconfigured) {
+ reconfigureVertex();
+ }
+ numSourceVertexConfigured++;
+ totalNumSrcTasks += getContext().getVertexNumTasks(stateUpdate.getVertexName());
+ // try schedule because there may be no more vertex start and source completions
+ tryScheduleTask();
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ int taskId = attempt.getTaskIdentifier().getIdentifier();
+ String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ BitSet bitSet = this.sourceTaskCompleted.get(vertex);
+ if (!bitSet.get(taskId)) {
+ bitSet.set(taskId);
+ numFinishedSrcTasks++;
+ tryScheduleTask();
+ }
+ }
+
+ /**
+ * schedule task as the ascending order of id. Slow start has same behavior as ShuffleVertexManager
+ */
+ private void tryScheduleTask() {
+ // only schedule task when vertex is already started and all source vertices are configured
+ if (!vertexStarted
+ || numSourceVertexConfigured != sourceVertices.size()) {
+ return;
+ }
+ // determine the destination task with largest id to schedule
+ float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
+ int numTaskToSchedule;
+ if (percentFinishedSrcTask < config.getMinFraction()) {
+ numTaskToSchedule = 0;
+ } else if (config.getMinFraction() <= percentFinishedSrcTask &&
+ percentFinishedSrcTask <= config.getMaxFraction()) {
+ numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
+ /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+ } else {
+ numTaskToSchedule = parallelism;
+ }
+ // schedule tasks if there are more we can schedule
+ if (numTaskToSchedule-1 > lastScheduledTaskId) {
+ List<ScheduleTaskRequest> scheduleTaskRequests = new ArrayList<>();
+ for (int i = lastScheduledTaskId + 1; i < numTaskToSchedule; i++) {
+ scheduleTaskRequests.add(ScheduleTaskRequest.create(i, null));
+ }
+ lastScheduledTaskId = numTaskToSchedule-1;
+ getContext().scheduleTasks(scheduleTaskRequests);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
new file mode 100644
index 0000000..84e65ac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.List;
+
+/**
+ * base class of cartesian product vertex manager implementation
+ */
+abstract class CartesianProductVertexManagerReal {
+ private final VertexManagerPluginContext context;
+
+ public CartesianProductVertexManagerReal(VertexManagerPluginContext context) {
+ this.context = context;
+ }
+
+ public final VertexManagerPluginContext getContext() {
+ return this.context;
+ }
+
+ public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception;
+
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+ public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception;
+
+ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+ public abstract void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception;
+}