You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/09/06 17:51:14 UTC
[1/2] tez git commit: TEZ-3230. Implement vertex manager and edge
manager of cartesian product edge. (Zhiyuan Yang via mingma)
Repository: tez
Updated Branches:
refs/heads/master af8246931 -> 1a068b239
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
new file mode 100644
index 0000000..af7d15e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
+ List<String> sourceVertices;
+ private int parallelism = 1;
+ private boolean vertexStarted = false;
+ private boolean vertexReconfigured = false;
+ private int numSourceVertexConfigured = 0;
+ private int[] numTasks;
+ private Queue<TaskAttemptIdentifier> pendingCompletedSrcTask = new LinkedList<>();
+ private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+ private BitSet scheduledTasks = new BitSet();
+ private CartesianProductConfig config;
+ private int numSrcHasCompletedTask = 0;
+
+ public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
+ sourceVertices = config.getSourceVertices();
+ numTasks = new int[sourceVertices.size()];
+ for (String vertex : sourceVertices) {
+ sourceTaskCompleted.put(vertex, new BitSet());
+ }
+ for (String vertex : sourceVertices) {
+ getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+ }
+ this.config = config;
+ getContext().vertexReconfigurationPlanned();
+ }
+
+ private void reconfigureVertex() throws IOException {
+ for (int numTask : numTasks) {
+ parallelism *= numTask;
+ }
+
+ UserPayload payload = null;
+ Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
+ for (EdgeProperty edgeProperty : edgeProperties.values()) {
+ EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor();
+ if (payload == null) {
+ CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+ builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
+ .addAllSourceVertices(config.getSourceVertices());
+ payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+ }
+ descriptor.setUserPayload(payload);
+ }
+ getContext().reconfigureVertex(parallelism, null, edgeProperties);
+ vertexReconfigured = true;
+ getContext().doneReconfiguringVertex();
+ }
+
+ @Override
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+ throws Exception {
+ vertexStarted = true;
+ // if vertex is already reconfigured, we can handle pending completions immediately
+ // otherwise we have to wait until vertex is reconfigured
+ if (vertexReconfigured) {
+ Preconditions.checkArgument(pendingCompletedSrcTask.size() == 0,
+ "Unexpected pending source completion on vertex start after vertex reconfiguration");
+ for (TaskAttemptIdentifier taId : completions) {
+ handleCompletedSrcTask(taId);
+ }
+ } else {
+ pendingCompletedSrcTask.addAll(completions);
+ }
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException {
+ Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+ String vertex = stateUpdate.getVertexName();
+ numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex);
+ // reconfigure vertex when all source vertices are CONFIGURED
+ if (++numSourceVertexConfigured == sourceVertices.size()) {
+ reconfigureVertex();
+ // handle pending source completions when vertex is started and reconfigured
+ if (vertexStarted) {
+ while (!pendingCompletedSrcTask.isEmpty()) {
+ handleCompletedSrcTask(pendingCompletedSrcTask.poll());
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ if (numSourceVertexConfigured < sourceVertices.size()) {
+ pendingCompletedSrcTask.add(attempt);
+ return;
+ }
+ Preconditions.checkArgument(pendingCompletedSrcTask.size() == 0,
+ "Unexpected pending src completion on source task completed after vertex reconfiguration");
+ handleCompletedSrcTask(attempt);
+ }
+
+ private void handleCompletedSrcTask(TaskAttemptIdentifier attempt) {
+ int taskId = attempt.getTaskIdentifier().getIdentifier();
+ String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ if (sourceTaskCompleted.get(vertex).get(taskId)) {
+ return;
+ }
+
+ if (sourceTaskCompleted.get(vertex).isEmpty()) {
+ numSrcHasCompletedTask++;
+ }
+ sourceTaskCompleted.get(vertex).set(taskId);
+ if (numSrcHasCompletedTask != sourceVertices.size()) {
+ return;
+ }
+
+ List<ScheduleTaskRequest> requests = new ArrayList<>();
+ CartesianProductCombination combination = new CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
+ combination.firstTaskWithFixedPartition(taskId);
+ do {
+ List<Integer> list = combination.getCombination();
+ boolean readyToSchedule = true;
+ for (int i = 0; i < list.size(); i++) {
+ if (!sourceTaskCompleted.get(sourceVertices.get(i)).get(list.get(i))) {
+ readyToSchedule = false;
+ break;
+ }
+ }
+ if (readyToSchedule && !scheduledTasks.get(combination.getTaskId())) {
+ requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
+ scheduledTasks.set(combination.getTaskId());
+ }
+ } while (combination.nextTaskWithFixedPartition());
+ if (!requests.isEmpty()) {
+ getContext().scheduleTasks(requests);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
new file mode 100644
index 0000000..39ba82c
--- /dev/null
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.library.cartesianproduct";
+option java_outer_classname = "CartesianProductUserPayload";
+
+message CartesianProductConfigProto {
+ required bool isPartitioned = 1;
+ repeated string sourceVertices = 2;
+ repeated int32 numPartitions = 3;
+ optional string filterClassName = 4;
+ optional bytes filterUserPayload = 5;
+ optional float minFraction = 6;
+ optional float maxFraction = 7;
+ repeated int32 numTasks = 8;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
new file mode 100644
index 0000000..0d6a928
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestCartesianProductCombination {
+ private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
+ assertArrayEquals(result, Ints.toArray(combination.getCombination()));
+ assertEquals(taskId, combination.getTaskId());
+ }
+
+ private void testCombinationTwoWayVertex0() {
+ CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0);
+
+ combination.firstTaskWithFixedPartition(1);
+ verifyCombination(combination, new int[]{1,0}, 3);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{1,1}, 4);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{1,2}, 5);
+ assertFalse(combination.nextTaskWithFixedPartition());
+ }
+
+ private void testCombinationTwoWayVertex1() {
+ CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1);
+
+ combination.firstTaskWithFixedPartition(1);
+ verifyCombination(combination, new int[]{0,1}, 1);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{1,1}, 4);
+
+ assertFalse(combination.nextTaskWithFixedPartition());
+ }
+
+ private void testCombinationThreeWay() {
+ CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1);
+
+ combination.firstTaskWithFixedPartition(1);
+ verifyCombination(combination, new int[]{0,1,0}, 2);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{0,1,1}, 3);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{1,1,0}, 6);
+ assertTrue(combination.nextTaskWithFixedPartition());
+ verifyCombination(combination, new int[]{1,1,1}, 7);
+ assertFalse(combination.nextTaskWithFixedPartition());
+ }
+
+ @Test(timeout = 5000)
+ public void testCombinationWithFixedPartition() {
+ // two way cartesian product
+ testCombinationTwoWayVertex0();
+ testCombinationTwoWayVertex1();
+
+ // three way cartesian product
+ testCombinationThreeWay();
+ }
+
+ @Test(timeout = 5000)
+ public void testCombination() {
+ CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3});
+ List<Integer> list = combination.getCombination();
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 3; j++) {
+ if (i == 0 && j == 0) {
+ combination.firstTask();
+ } else {
+ assertTrue(combination.nextTask());
+ }
+ assertTrue(list.get(0) == i);
+ assertTrue(list.get(1) == j);
+ }
+ }
+ assertFalse(combination.nextTask());
+ }
+
+ @Test//(timeout = 5000)
+ public void testFromTaskId() {
+ for (int i = 0; i < 6; i++) {
+ List<Integer> list = CartesianProductCombination.fromTaskId(new int[]{2,3}, i)
+ .getCombination();
+ assertTrue(list.get(0) == i/3);
+ assertTrue(list.get(1) == i%3);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
new file mode 100644
index 0000000..2de750f
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestCartesianProductConfig {
+ private TezConfiguration conf = new TezConfiguration();
+
+ @Test(timeout = 5000)
+ public void testSerializationPartitioned() throws IOException {
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+ vertexPartitionMap.put("v1", 2);
+ vertexPartitionMap.put("v2", 3);
+ vertexPartitionMap.put("v3", 4);
+ String filterClassName = "filter";
+ byte[] bytes = new byte[10];
+ (new Random()).nextBytes(bytes);
+ CartesianProductFilterDescriptor filterDescriptor =
+ new CartesianProductFilterDescriptor(filterClassName)
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(bytes)));
+ CartesianProductConfig config =
+ new CartesianProductConfig(vertexPartitionMap, filterDescriptor);
+ UserPayload payload = config.toUserPayload(conf);
+ CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload);
+ assertConfigEquals(config, parsedConfig);
+ }
+
+ @Test(timeout = 5000)
+ public void testSerializationUnpartitioned() throws Exception {
+ List<String> sourceVertices = new ArrayList<>();
+ sourceVertices.add("v1");
+ sourceVertices.add("v2");
+ sourceVertices.add("v3");
+ CartesianProductConfig config =
+ new CartesianProductConfig(sourceVertices);
+ UserPayload payload = config.toUserPayload(conf);
+ CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload);
+ assertConfigEquals(config, parsedConfig);
+
+ // unpartitioned config should have null in numPartitions fields
+ try {
+ config = new CartesianProductConfig(false, new int[]{}, new String[]{"v0","v1"},null);
+ config.checkNumPartitions();
+ } catch (Exception e) {
+ return;
+ }
+ throw new Exception();
+ }
+
+ private void assertConfigEquals(CartesianProductConfig config1, CartesianProductConfig config2) {
+ assertArrayEquals(config1.getSourceVertices().toArray(new String[0]),
+ config2.getSourceVertices().toArray(new String[0]));
+ if (config1.getNumPartitions() == null) {
+ assertNull(config2.getNumPartitions());
+ } else {
+ assertArrayEquals(Ints.toArray(config1.getNumPartitions()),
+ Ints.toArray(config2.getNumPartitions()));
+ }
+ CartesianProductFilterDescriptor descriptor1 = config1.getFilterDescriptor();
+ CartesianProductFilterDescriptor descriptor2 = config1.getFilterDescriptor();
+
+ if (descriptor1 != null && descriptor2 != null) {
+ assertEquals(descriptor1.getClassName(), descriptor2.getClassName());
+ UserPayload payload1 = descriptor1.getUserPayload();
+ UserPayload payload2 = descriptor2.getUserPayload();
+ if (payload1 != null && payload2 != null) {
+ assertEquals(0, payload1.getPayload().compareTo(payload2.getPayload()));
+ }
+ } else {
+ assertNull(descriptor1);
+ assertNull(descriptor2);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
new file mode 100644
index 0000000..9581a6e
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManager {
+ @Test(timeout = 5000)
+ public void testInitialize() throws Exception {
+ EdgeManagerPluginContext context = mock(EdgeManagerPluginContext.class);
+ when(context.getSourceVertexName()).thenReturn("v0");
+ CartesianProductEdgeManager edgeManager = new CartesianProductEdgeManager(context);
+
+ // partitioned case
+ CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+ builder.setIsPartitioned(true)
+ .addAllSourceVertices(Arrays.asList(new String[]{"v0", "v1"}))
+ .addAllNumPartitions(Ints.asList(new int[]{2,3}));
+ UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+ when(context.getUserPayload()).thenReturn(payload);
+ edgeManager.initialize();
+ assertTrue(edgeManager.getEdgeManagerReal()
+ instanceof CartesianProductEdgeManagerPartitioned);
+
+ // unpartitioned case
+ List<String> sourceVertices = new ArrayList<>();
+ sourceVertices.add("v0");
+ sourceVertices.add("v1");
+ builder.clear();
+ builder.setIsPartitioned(false)
+ .addAllSourceVertices(Arrays.asList(new String[]{"v0", "v1"}))
+ .addAllNumTasks(Ints.asList(new int[]{2,3}));
+ payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+ when(context.getUserPayload()).thenReturn(payload);
+ when(context.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize();
+ assertTrue(edgeManager.getEdgeManagerReal()
+ instanceof CartesianProductEdgeManagerUnpartitioned);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..2e8697d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManagerPartitioned {
+ private EdgeManagerPluginContext mockContext;
+ private CartesianProductEdgeManagerPartitioned edgeManager;
+
+ @Before
+ public void setup() {
+ mockContext = mock(EdgeManagerPluginContext.class);
+ edgeManager = new CartesianProductEdgeManagerPartitioned(mockContext);
+ }
+
+ /**
+ * Vertex v0 has 2 tasks which generate 3 partitions
+ * Vertex v1 has 3 tasks which generate 4 partitions
+ */
+ @Test(timeout = 5000)
+ public void testTwoWay() throws Exception {
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null);
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(12);
+ testTwoWayV0(emConfig);
+ testTwoWayV1(emConfig);
+ }
+
+ private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeDataMovementEventToDestination(1,0,1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeDataMovementEventToDestination(1,1,1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+ assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+ assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ public static class TestFilter extends CartesianProductFilter {
+ char op;
+
+ public TestFilter(UserPayload payload) {
+ super(payload);
+ op = payload.getPayload().getChar();
+ }
+
+ @Override
+ public boolean isValidCombination(Map<String, Integer> vertexPartitionMap) {
+ switch (op) {
+ case '>':
+ return vertexPartitionMap.get("v0") > vertexPartitionMap.get("v1");
+ case '<':
+ return vertexPartitionMap.get("v0") < vertexPartitionMap.get("v1");
+ default:
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Vertex v0 has 2 tasks which generate 3 partitions
+ * Vertex v1 has 3 tasks which generate 4 partitions
+ */
+ @Test//(timeout = 5000)
+ public void testTwoWayWithFilter() throws Exception {
+ ByteBuffer buffer = ByteBuffer.allocate(2);
+ buffer.putChar('>');
+ buffer.flip();
+ CartesianProductFilterDescriptor filterDescriptor =
+ new CartesianProductFilterDescriptor(TestFilter.class.getName())
+ .setUserPayload(UserPayload.create(buffer));
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null,
+ filterDescriptor);
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+ testTwoWayV0WithFilter(emConfig);
+ testTwoWayV1WithFilter(emConfig);
+ }
+
+ private void testTwoWayV0WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{2}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+ assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ private void testTwoWayV1WithFilter(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+ assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ /**
+ * Vertex v0 has 2 tasks which generate 4 partitions
+ * Vertex v1 has 3 tasks which generate 3 partitions
+ * Vertex v2 has 4 tasks which generate 2 partitions
+ */
+ @Test(timeout = 5000)
+ public void testThreeWay() throws Exception {
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, null);
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(24);
+ testThreeWayV0(emConfig);
+ testThreeWayV1(emConfig);
+ testThreeWayV2(emConfig);
+ }
+
+ private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 1));
+
+ assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+ assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(3, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+
+ private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v2");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getSourceIndices());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{1}, routingData.getTargetIndices());
+
+ assertEquals(2, edgeManager.routeInputErrorEventToSource(1, 2));
+
+ assertEquals(24, edgeManager.getNumDestinationConsumerTasks(1));
+ assertEquals(4, edgeManager.getNumDestinationTaskPhysicalInputs(10));
+ assertEquals(2, edgeManager.getNumSourceTaskPhysicalOutputs(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..4c69482
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductEdgeManagerUnpartitioned {
+ private EdgeManagerPluginContext mockContext;
+ private CartesianProductEdgeManagerUnpartitioned edgeManager;
+
+ @Before
+ public void setup() {
+ mockContext = mock(EdgeManagerPluginContext.class);
+ edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext);
+ }
+
+ /**
+ * Vertex v0 has 2 tasks
+ * Vertex v1 has 3 tasks
+ */
+ @Test(timeout = 5000)
+ public void testTwoWay() throws Exception {
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, new int[]{2,3}, null);
+ testTwoWayV0(emConfig);
+ testTwoWayV1(emConfig);
+ }
+
+ private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+ assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+ assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
+
+ assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+ assertEquals(2, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ /**
+ * Vertex v0 has 2 tasks
+ * Vertex v1 has 3 tasks
+ * Vertex v2 has 4 tasks
+ */
+ @Test(timeout = 5000)
+ public void testThreeWay() throws Exception {
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, null, new int[]{2,3,4}, null);
+ testThreeWayV0(emConfig);
+ testThreeWayV1(emConfig);
+ testThreeWayV2(emConfig);
+ }
+
+ private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+ assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+ assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+
+ assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+ assertEquals(8, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception {
+ when(mockContext.getSourceVertexName()).thenReturn("v2");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
+ edgeManager.initialize(config);
+
+ EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
+ assertNull(routingData);
+
+ routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
+ assertNotNull(routingData);
+ assertEquals(1, routingData.getNumEvents());
+ assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+ assertArrayEquals(new int[]{0}, routingData.getSourceIndices());
+
+ assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
+
+ assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
+ assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
+ assertEquals(6, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ @Test(timeout = 5000)
+ public void testZeroSrcTask() {
+ CartesianProductEdgeManagerConfig emConfig =
+ new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null, new int[]{2, 0}, null);
+ testZeroSrcTaskV0(emConfig);
+ testZeroSrcTaskV1(emConfig);
+ }
+
+ private void testZeroSrcTaskV0(CartesianProductEdgeManagerConfig config) {
+ when(mockContext.getSourceVertexName()).thenReturn("v0");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
+ edgeManager.initialize(config);
+
+ assertEquals(0, edgeManager.getNumDestinationConsumerTasks(0));
+ assertEquals(0, edgeManager.getNumDestinationConsumerTasks(1));
+ }
+
+ private void testZeroSrcTaskV1(CartesianProductEdgeManagerConfig config) {
+ when(mockContext.getSourceVertexName()).thenReturn("v1");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(0);
+ edgeManager.initialize(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
new file mode 100644
index 0000000..755c578
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManager {
+ @Test(timeout = 5000)
+ public void testInitialize() throws Exception {
+ VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
+ CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(context);
+ TezConfiguration conf = new TezConfiguration();
+
+ // partitioned case
+ CartesianProductConfig config =
+ new CartesianProductConfig(new int[]{2,3}, new String[]{"v0", "v1"}, null);
+ when(context.getUserPayload()).thenReturn(config.toUserPayload(conf));
+ EdgeProperty edgeProperty =
+ EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+ CartesianProductEdgeManager.class.getName()), null, null, null, null);
+ Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+ edgePropertyMap.put("v0", edgeProperty);
+ edgePropertyMap.put("v1", edgeProperty);
+ when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+ vertexManager.initialize();
+ assertTrue(vertexManager.getVertexManagerReal()
+ instanceof CartesianProductVertexManagerPartitioned);
+
+ // unpartitioned case
+ List<String> sourceVertices = new ArrayList<>();
+ sourceVertices.add("v0");
+ sourceVertices.add("v1");
+ config = new CartesianProductConfig(sourceVertices);
+ when(context.getUserPayload()).thenReturn(config.toUserPayload(conf));
+ vertexManager.initialize();
+ assertTrue(vertexManager.getVertexManagerReal()
+ instanceof CartesianProductVertexManagerUnpartitioned);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..9aca647
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManagerPartitioned {
+ @Captor
+ private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
+ @Captor
+ private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+ private TezConfiguration conf = new TezConfiguration();
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ public static class TestFilter extends CartesianProductFilter {
+ public TestFilter(UserPayload payload) {
+ super(payload);
+ }
+
+ @Override
+ public boolean isValidCombination(Map<String, Integer> vertexPartitionMap) {
+ return vertexPartitionMap.get("v0") > vertexPartitionMap.get("v1");
+ }
+ }
+
+ private void testReconfigureVertexHelper(CartesianProductConfig config, int parallelism)
+ throws Exception {
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+
+ EdgeProperty edgeProperty =
+ EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+ CartesianProductEdgeManager.class.getName()), null, null, null, null);
+ Map<String, EdgeProperty> inputEdgeProperties = new HashMap<>();
+ for (String vertex : config.getSourceVertices()) {
+ inputEdgeProperties.put(vertex, edgeProperty);
+ }
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+ CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+ vertexManager.initialize();
+ ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ verify(mockContext, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+ isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+ assertEquals((int)parallelismCaptor.getValue(), parallelism);
+ assertNull(edgePropertiesCaptor.getValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testReconfigureVertex() throws Exception {
+ testReconfigureVertexHelper(
+ new CartesianProductConfig(new int[]{5,5}, new String[]{"v0", "v1"},
+ new CartesianProductFilterDescriptor(TestFilter.class.getName())), 10);
+ testReconfigureVertexHelper(
+ new CartesianProductConfig(new int[]{5,5}, new String[]{"v0", "v1"}, null), 25);
+ }
+
+ @Test(timeout = 5000)
+ public void testScheduling() throws Exception {
+ CartesianProductConfig config = new CartesianProductConfig(new int[]{2,2},
+ new String[]{"v0", "v1"}, null);
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+ Set<String> inputVertices = new HashSet<String>();
+ inputVertices.add("v0");
+ inputVertices.add("v1");
+ when(mockContext.getVertexInputNames()).thenReturn(inputVertices);
+ when(mockContext.getVertexNumTasks("v0")).thenReturn(4);
+ when(mockContext.getVertexNumTasks("v1")).thenReturn(4);
+ EdgeProperty edgeProperty =
+ EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+ CartesianProductEdgeManager.class.getName()), null, null, null, null);
+ Map<String, EdgeProperty> inputEdgeProperties = new HashMap<String, EdgeProperty>();
+ inputEdgeProperties.put("v0", edgeProperty);
+ inputEdgeProperties.put("v1", edgeProperty);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+ CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+ vertexManager.initialize();
+
+ vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+
+ TaskAttemptIdentifier taId = mock(TaskAttemptIdentifier.class, Mockito.RETURNS_DEEP_STUBS);
+ when(taId.getTaskIdentifier().getVertexIdentifier().getName()).thenReturn("v0", "v0", "v1",
+ "v1", "v0", "v0", "v1", "v1");
+ when(taId.getTaskIdentifier().getIdentifier()).thenReturn(0, 1, 0, 1, 2, 3, 2, 3);
+
+ for (int i = 0; i < 2; i++) {
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ }
+
+ List<ScheduleTaskRequest> scheduleTaskRequests;
+
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+ assertEquals(1, scheduleTaskRequests.size());
+ assertEquals(0, scheduleTaskRequests.get(0).getTaskIndex());
+
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, times(2)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+ assertEquals(1, scheduleTaskRequests.size());
+ assertEquals(1, scheduleTaskRequests.get(0).getTaskIndex());
+
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, times(3)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+ assertEquals(1, scheduleTaskRequests.size());
+ assertEquals(2, scheduleTaskRequests.get(0).getTaskIndex());
+
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, times(4)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+ assertEquals(1, scheduleTaskRequests.size());
+ assertEquals(3, scheduleTaskRequests.get(0).getTaskIndex());
+
+ for (int i = 0; i < 2; i++) {
+ vertexManager.onSourceTaskCompleted(taId);
+ verify(mockContext, times(4)).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testVertexStartWithCompletion() throws Exception {
+ CartesianProductConfig config = new CartesianProductConfig(new int[]{2,2},
+ new String[]{"v0", "v1"}, null);
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getUserPayload()).thenReturn(config.toUserPayload(conf));
+ Set<String> inputVertices = new HashSet<String>();
+ inputVertices.add("v0");
+ inputVertices.add("v1");
+ when(mockContext.getVertexInputNames()).thenReturn(inputVertices);
+ when(mockContext.getVertexNumTasks("v0")).thenReturn(4);
+ when(mockContext.getVertexNumTasks("v1")).thenReturn(4);
+ EdgeProperty edgeProperty =
+ EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+ CartesianProductEdgeManager.class.getName()), null, null, null, null);
+ Map<String, EdgeProperty> inputEdgeProperties = new HashMap<String, EdgeProperty>();
+ inputEdgeProperties.put("v0", edgeProperty);
+ inputEdgeProperties.put("v1", edgeProperty);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(inputEdgeProperties);
+ CartesianProductVertexManager vertexManager = new CartesianProductVertexManager(mockContext);
+ vertexManager.initialize();
+
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+ List<TaskAttemptIdentifier> completions = new ArrayList<>();
+ TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), 0);
+ TezVertexID v0Id = TezVertexID.getInstance(dagId, 0);
+ TezVertexID v1Id = TezVertexID.getInstance(dagId, 1);
+
+ completions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(v0Id, 0), 0)));
+ completions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(v0Id, 1), 0)));
+ completions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1Id, 0), 0)));
+
+ vertexManager.onVertexStarted(completions);
+
+ List<ScheduleTaskRequest> scheduleTaskRequests;
+ verify(mockContext, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ scheduleTaskRequests = scheduleTaskRequestCaptor.getValue();
+ assertEquals(1, scheduleTaskRequests.size());
+ assertEquals(0, scheduleTaskRequests.get(0).getTaskIndex());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
new file mode 100644
index 0000000..f76de96
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCartesianProductVertexManagerUnpartitioned {
+ @Captor
+ private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
+ @Captor
+ private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+ private CartesianProductVertexManagerUnpartitioned vertexManager;
+ private VertexManagerPluginContext context;
+ private List<TaskAttemptIdentifier> allCompletions;
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ context = mock(VertexManagerPluginContext.class);
+ vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+ when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
+ when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
+
+ CartesianProductVertexManagerConfig config =
+ new CartesianProductVertexManagerConfig(false, new String[]{"v0","v1"}, null, 0, 0, null);
+ vertexManager.initialize(config);
+ allCompletions = new ArrayList<>();
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 0), 0), 1)));
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 1), 0), 0)));
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 1), 0), 1)));
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 1), 0), 2)));
+ }
+
+ @Test(timeout = 5000)
+ public void testReconfigureVertex() throws Exception {
+ ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class);
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ verify(context, never()).reconfigureVertex(
+ anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+ verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+ isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+ assertEquals(6, (int)parallelismCaptor.getValue());
+ Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+ for (EdgeProperty edgeProperty : edgeProperties.values()) {
+ UserPayload payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
+ CartesianProductEdgeManagerConfig newConfig =
+ CartesianProductEdgeManagerConfig.fromUserPayload(payload);
+ assertArrayEquals(new int[]{2,3}, newConfig.getNumTasks());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testCompletionAfterReconfigured() throws Exception {
+ vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ vertexManager.onSourceTaskCompleted(allCompletions.get(2));
+ verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+ assertNotNull(requests);
+ assertEquals(1, requests.size());
+ assertEquals(0, requests.get(0).getTaskIndex());
+ }
+
+ @Test(timeout = 5000)
+ public void testCompletionBeforeReconfigured() throws Exception {
+ vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+ vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ vertexManager.onSourceTaskCompleted(allCompletions.get(2));
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+ verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+ assertNotNull(requests);
+ assertEquals(1, requests.size());
+ assertEquals(0, requests.get(0).getTaskIndex());
+ }
+
+ @Test(timeout = 5000)
+ public void testStartAfterReconfigured() throws Exception {
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+ List<TaskAttemptIdentifier> completion = new ArrayList<>();
+ completion.add(allCompletions.get(0));
+ completion.add(allCompletions.get(2));
+ vertexManager.onVertexStarted(completion);
+ verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
+ List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
+ assertNotNull(requests);
+ assertEquals(1, requests.size());
+ assertEquals(0, requests.get(0).getTaskIndex());
+ }
+
+ @Test(timeout = 5000)
+ public void testStartBeforeReconfigured() throws Exception {
+ vertexManager.onVertexStarted(allCompletions);
+ verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+ }
+
+ @Test(timeout = 5000)
+ public void testZeroSrcTask() throws Exception {
+ context = mock(VertexManagerPluginContext.class);
+ vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+ when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
+ when(context.getVertexNumTasks(eq("v1"))).thenReturn(0);
+
+ CartesianProductVertexManagerConfig config =
+ new CartesianProductVertexManagerConfig(false, new String[]{"v0","v1"}, null, 0, 0, null);
+ vertexManager.initialize(config);
+ allCompletions = new ArrayList<>();
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
+ allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance("0", 0, 0), 0), 0), 1)));
+
+ vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+ vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+ vertexManager.onSourceTaskCompleted(allCompletions.get(0));
+ vertexManager.onSourceTaskCompleted(allCompletions.get(1));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 2d10f94..764ef0f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -19,8 +19,17 @@
package org.apache.tez.test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Random;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -778,5 +787,68 @@ public class TestFaultTolerance {
// dag will fail with 2 attempts failing from vertex v1
runDAGAndVerify(dag, DAGStatus.State.FAILED, 2, "no progress");
}
-
+
+ /**
+ * In unpartitioned cartesian product, failure fraction should be #unique failure/#consumer that
+ * depends on the src task. Here we test a 2x2 cartesian product and let 4th destination task fail.
+ * The failure fraction limit is configured to be 0.25. So the failure fraction should be 1/2,
+ * not 1/4.
+ * @throws Exception
+ */
+ @Test
+ public void testCartesianProduct() throws Exception {
+ Configuration dagConf = new Configuration();
+ dagConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.25);
+ DAG dag = DAG.create("dag");
+
+ Configuration vertexConf = new Configuration();
+ vertexConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), 3);
+ vertexConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3"), 5);
+ UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(vertexConf);
+ ProcessorDescriptor processorDescriptor =
+ ProcessorDescriptor.create(TestProcessor.class.getName()).setUserPayload(vertexPayload);
+ Vertex v1 = Vertex.create("v1", processorDescriptor, 2);
+ Vertex v2 = Vertex.create("v2", processorDescriptor, 2);
+ Vertex v3 = Vertex.create("v3", processorDescriptor);
+
+ String[] sourceVertices = {"v1", "v2"};
+ CartesianProductConfig cartesianProductConfig =
+ new CartesianProductConfig(Arrays.asList(sourceVertices));
+ UserPayload cartesianProductPayload =
+ cartesianProductConfig.toUserPayload(new TezConfiguration());
+
+ v3.setVertexManagerPlugin(
+ VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+ .setUserPayload(cartesianProductPayload));
+
+ EdgeManagerPluginDescriptor edgeManagerPluginDescriptor =
+ EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName())
+ .setUserPayload(cartesianProductPayload);
+
+ Configuration inputConf = new Configuration();
+ inputConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ inputConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), 3);
+ inputConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), 0);
+ inputConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), 0);
+ inputConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), 0);
+ UserPayload inputPayload = TezUtils.createUserPayloadFromConf(inputConf);
+ EdgeProperty edgeProperty =
+ EdgeProperty.create(edgeManagerPluginDescriptor, DataMovementType.CUSTOM,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, TestOutput.getOutputDesc(null),
+ TestInput.getInputDesc(inputPayload));
+ Edge e1 = Edge.create(v1, v3, edgeProperty);
+ Edge e2 = Edge.create(v2, v3, edgeProperty);
+ dag.addVertex(v1).addVertex(v2).addVertex(v3);
+ dag.addEdge(e1).addEdge(e2);
+
+ // run dag
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
}
[2/2] tez git commit: TEZ-3230. Implement vertex manager and edge
manager of cartesian product edge. (Zhiyuan Yang via mingma)
Posted by mi...@apache.org.
TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a068b23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a068b23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a068b23
Branch: refs/heads/master
Commit: 1a068b2391684563bb53a0720848b7673d8dc46c
Parents: af82469
Author: Ming Ma <mi...@twitter.com>
Authored: Tue Sep 6 10:49:50 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Tue Sep 6 10:49:50 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +-
.../apache/tez/examples/CartesianProduct.java | 208 ++++++++++++++
tez-runtime-library/findbugs-exclude.xml | 18 ++
tez-runtime-library/pom.xml | 1 +
.../CartesianProductCombination.java | 164 +++++++++++
.../CartesianProductConfig.java | 255 +++++++++++++++++
.../CartesianProductEdgeManager.java | 106 +++++++
.../CartesianProductEdgeManagerConfig.java | 64 +++++
.../CartesianProductEdgeManagerPartitioned.java | 124 ++++++++
.../CartesianProductEdgeManagerReal.java | 62 ++++
...artesianProductEdgeManagerUnpartitioned.java | 98 +++++++
.../CartesianProductFilter.java | 47 +++
.../CartesianProductFilterDescriptor.java | 28 ++
.../CartesianProductVertexManager.java | 139 +++++++++
.../CartesianProductVertexManagerConfig.java | 75 +++++
...artesianProductVertexManagerPartitioned.java | 176 ++++++++++++
.../CartesianProductVertexManagerReal.java | 50 ++++
...tesianProductVertexManagerUnpartitioned.java | 178 ++++++++++++
.../main/proto/CartesianProductPayload.proto | 31 ++
.../TestCartesianProductCombination.java | 110 +++++++
.../TestCartesianProductConfig.java | 106 +++++++
.../TestCartesianProductEdgeManager.java | 68 +++++
...tCartesianProductEdgeManagerPartitioned.java | 284 +++++++++++++++++++
...artesianProductEdgeManagerUnpartitioned.java | 240 ++++++++++++++++
.../TestCartesianProductVertexManager.java | 67 +++++
...artesianProductVertexManagerPartitioned.java | 230 +++++++++++++++
...tesianProductVertexManagerUnpartitioned.java | 194 +++++++++++++
.../org/apache/tez/test/TestFaultTolerance.java | 74 ++++-
29 files changed, 3198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b73dd3f..0225db6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
TEZ-3326. Display JVM system properties in AM and task logs.
TEZ-3009. Errors that occur during container task acquisition are not logged.
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index e39315b..e5f3e71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1705,8 +1705,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
- boolean crossTimeDeadline = readErrorTimespanSec >=
- MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+ boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
/ outputFailedEvent.getConsumerTaskNumber();
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
new file mode 100644
index 0000000..9f3d490
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
+import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+/**
+ * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one
+ * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions
+ * according to the parity of token's first char. Then JoinProcessor does cartesian product of
+ * partitioned token sets.
+ */
+public class CartesianProduct extends TezExampleBase {
+ private static final String INPUT = "Input1";
+ private static final String OUTPUT = "Output";
+ private static final String VERTEX1 = "Vertex1";
+ private static final String VERTEX2 = "Vertex2";
+ private static final String VERTEX3 = "Vertex3";
+ private static final String PARTITIONED = "-partitioned";
+ private static final String UNPARTITIONED = "-unpartitioned";
+ private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class);
+ private static final int numPartition = 2;
+ private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2};
+
+ public static class TokenProcessor extends SimpleProcessor {
+ public TokenProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Preconditions.checkArgument(getInputs().size() == 1);
+ Preconditions.checkArgument(getOutputs().size() == 1);
+ KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
+ KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
+ while (kvReader.next()) {
+ StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+ while (itr.hasMoreTokens()) {
+ kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
+ }
+ }
+ }
+ }
+
+ public static class JoinProcessor extends SimpleMRProcessor {
+ public JoinProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
+ KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
+ KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
+ Set<String> rightSet = new HashSet<>();
+
+ while (kvReader2.next()) {
+ rightSet.add(kvReader2.getCurrentKey().toString());
+ }
+
+ while (kvReader1.next()) {
+ String left = kvReader1.getCurrentKey().toString();
+ for (String right : rightSet) {
+ kvWriter.write(left, right);
+ }
+ }
+ }
+ }
+
+ public static class CustomPartitioner implements Partitioner {
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return key.toString().charAt(0) % numPartition;
+ }
+ }
+
+ private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2,
+ String outputPath, boolean isPartitioned) throws IOException {
+ Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+ // turn off groupSplit so that each input file incurs one task
+ v1.addDataSource(INPUT,
+ MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1)
+ .groupSplits(false).build());
+ Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName()));
+ v2.addDataSource(INPUT,
+ MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2)
+ .groupSplits(false).build());
+ CartesianProductConfig cartesianProductConfig;
+ if (isPartitioned) {
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+ for (String vertex : sourceVertices) {
+ vertexPartitionMap.put(vertex, numPartition);
+ }
+ cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap);
+ } else {
+ cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices));
+ }
+ UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf);
+ Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName()));
+ v3.addDataSink(OUTPUT,
+ MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
+ .build());
+ v3.setVertexManagerPlugin(
+ VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
+ .setUserPayload(userPayload));
+
+ DAG dag = DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3);
+ EdgeManagerPluginDescriptor edgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName());
+ edgeManagerDescriptor.setUserPayload(userPayload);
+ EdgeProperty edgeProperty;
+ if (isPartitioned) {
+ UnorderedPartitionedKVEdgeConfig edgeConf =
+ UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(),
+ CustomPartitioner.class.getName()).build();
+ edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+ } else {
+ UnorderedKVEdgeConfig edgeConf =
+ UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
+ edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
+ }
+ dag.addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty));
+
+ return dag;
+ }
+
+ @Override
+ protected void printUsage() {
+ System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED
+ + " <input_dir1> <input_dir2> <output_dir>");
+ }
+
+ @Override
+ protected int validateArgs(String[] otherArgs) {
+ return (otherArgs.length != 4 || (!otherArgs[0].equals(PARTITIONED)
+ && !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0;
+ }
+
+ @Override
+ protected int runJob(String[] args, TezConfiguration tezConf,
+ TezClient tezClient) throws Exception {
+ DAG dag = createDAG(tezConf, args[1], args[2],
+ args[3], args[0].equals(PARTITIONED));
+ return runDag(dag, isCountersLog(), LOG);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args);
+ System.exit(res);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 4e15edc..d3b6245 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -123,6 +123,24 @@
</Match>
<Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+ <Field name="unknownFields"/>
+ <Bug pattern="SE_BAD_FIELD"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/>
+ <Field name="PARSER"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto$Builder"/>
+ <Method name="maybeForceBuilderInitialization"/>
+ <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+ </Match>
+
+ <Match>
<Bug pattern="EI_EXPOSE_REP"/>
<Or>
<Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" />
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 9831e50..b676933 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -129,6 +129,7 @@
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>ShufflePayloads.proto</include>
+ <include>CartesianProductPayload.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
new file mode 100644
index 0000000..a46993d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represent the combination of source partitions or tasks.
+ *
+ * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
+ * destination tasks. The mapping from source partition/task to destination task is like this:
+ * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
+ *
+ * Basically, it stores the source partition/task combination and can compute corresponding
+ * destination task. It can also figure out the source combination from a given destination task.
+ * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
+ * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ *
+ * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
+ * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
+ *
+ * Or you can also traverse all combinations that has one specific partition with
+ * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
+ * partition.
+ */
+class CartesianProductCombination {
+ // numPartitions for partitioned case, numTasks for unpartitioned case
+ private int[] numPartitionOrTask;
+ // at which position (in source vertices array) our vertex is
+ private int positionId = -1;
+ // The i-th element Ci represents partition/task Ci of source vertex i.
+ private final Integer[] combination;
+ // the weight of each vertex when computing the task id
+ private final Integer[] factor;
+
+ public CartesianProductCombination(int[] numPartitionOrTask) {
+ this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
+ combination = new Integer[numPartitionOrTask.length];
+ factor = new Integer[numPartitionOrTask.length];
+ factor[factor.length-1] = 1;
+ for (int i = combination.length-2; i >= 0; i--) {
+ factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+ }
+ }
+
+ public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
+ this(numPartitionOrTask);
+ this.positionId = positionId;
+ }
+
+ /**
+ * @return a read only view of current combination
+ */
+ public List<Integer> getCombination() {
+ return Collections.unmodifiableList(Arrays.asList(combination));
+ }
+
+ /**
+ * first combination with given partition id in current position
+ * @param partition
+ */
+ public void firstTaskWithFixedPartition(int partition) {
+ Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+ Arrays.fill(combination, 0);
+ combination[positionId] = partition;
+ }
+
+ /**
+ * next combination without current partition in current position
+ * @return false if there is no next combination
+ */
+ public boolean nextTaskWithFixedPartition() {
+ Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
+ int i;
+ for (i = combination.length-1; i >= 0; i--) {
+ if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ combination[i]++;
+
+ for (i++; i < combination.length; i++) {
+ if (i != positionId) {
+ combination[i] = 0;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * first combination with given partition id in current position
+ */
+ public void firstTask() {
+ Arrays.fill(combination, 0);
+ }
+
+ /**
+ * next combination without current partition in current position
+ * @return false if there is no next combination
+ */
+ public boolean nextTask() {
+ int i;
+ for (i = combination.length-1; i >= 0; i--) {
+ if (combination[i] != numPartitionOrTask[i]-1) {
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ combination[i]++;
+ Arrays.fill(combination, i+1, combination.length, 0);
+ return true;
+ }
+
+ /**
+ * @return corresponding task id for current combination
+ */
+ public int getTaskId() {
+ int taskId = 0;
+ for (int i = 0; i < combination.length; i++) {
+ taskId += combination[i]*factor[i];
+ }
+ return taskId;
+ }
+
+ public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+ int taskId) {
+ CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+ for (int i = 0; i < result.combination.length; i++) {
+ result.combination[i] = taskId/result.factor[i];
+ taskId %= result.factor[i];
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
new file mode 100644
index 0000000..b682182
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+/**
+ * <class>CartesianProductConfig</class> is used to configure both
+ * <class>CartesianProductVertexManager</class> and <class>CartesianProductEdgeManager</class>.
+ * User need to specify the vertices and number of partitions of each vertices' output at least.
+ * In partitioned case, filter should be specified here also(via
+ * <class>CartesianProductFilterDescriptor</class>. User may also configure min/max fractions used
+ * in slow start.
+ */
+@Evolving
+public class CartesianProductConfig {
+ private final boolean isPartitioned;
+ private final String[] sourceVertices;
+ private final int[] numPartitions;
+ private final CartesianProductFilterDescriptor filterDescriptor;
+
+ /**
+ * create config for unpartitioned case
+ * @param sourceVertices list of source vertices names
+ */
+ public CartesianProductConfig(List<String> sourceVertices) {
+ Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
+ Preconditions.checkArgument(sourceVertices.size() > 1,
+ "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+
+ this.isPartitioned = false;
+ this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+ this.numPartitions = null;
+ this.filterDescriptor = null;
+ }
+
+ /**
+ * create config for partitioned case without filter
+ * @param vertexPartitionMap the map from vertex name to its number of partitions
+ */
+ public CartesianProductConfig(Map<String, Integer> vertexPartitionMap) {
+ this(vertexPartitionMap, null);
+ }
+
+ /**
+ * create config for partitioned case with filter
+ * @param vertexPartitionMap the map from vertex name to its number of partitions
+ * @param filterDescriptor
+ */
+ public CartesianProductConfig(Map<String, Integer> vertexPartitionMap,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null");
+ Preconditions.checkArgument(vertexPartitionMap.size() > 1,
+ "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size());
+
+ this.isPartitioned = true;
+ this.numPartitions = new int[vertexPartitionMap.size()];
+ this.sourceVertices = new String[vertexPartitionMap.size()];
+ this.filterDescriptor = filterDescriptor;
+
+ int i = 0;
+ for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
+ this.sourceVertices[i] = entry.getKey();
+ this.numPartitions[i] = entry.getValue();
+ i++;
+ }
+
+ checkNumPartitions();
+ }
+
+ /**
+ * create config for partitioned case, with specified source vertices order
+ * @param numPartitions
+ * @param sourceVertices
+ * @param filterDescriptor
+ */
+ @VisibleForTesting
+ protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
+ Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
+ Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
+ "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
+ "(length: " + sourceVertices.length + ") cannot have different length");
+ Preconditions.checkArgument(sourceVertices.length > 1,
+ "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+
+ this.isPartitioned = true;
+ this.numPartitions = numPartitions;
+ this.sourceVertices = sourceVertices;
+ this.filterDescriptor = filterDescriptor;
+
+ checkNumPartitions();
+ }
+
+ /**
+ * create config for both cases, used by subclass
+ */
+ protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
+ String[] sourceVertices,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ this.isPartitioned = isPartitioned;
+ this.numPartitions = numPartitions;
+ this.sourceVertices = sourceVertices;
+ this.filterDescriptor = filterDescriptor;
+ }
+
+ @VisibleForTesting
+ protected void checkNumPartitions() {
+ if (isPartitioned) {
+ boolean isUnpartitioned = true;
+ for (int i = 0; i < numPartitions.length; i++) {
+ Preconditions.checkArgument(this.numPartitions[i] > 0,
+ "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+ isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
+ }
+ Preconditions.checkArgument(!isUnpartitioned,
+ "every source vertex has 1 partition in a partitioned case");
+ } else {
+ Preconditions.checkArgument(this.numPartitions == null,
+ "partition counts should be null in unpartitioned case");
+ }
+ }
+
+ /**
+ * @return the array of source vertices names
+ */
+ public List<String> getSourceVertices() {
+ return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+ }
+
+ /**
+ * @return the array of number of partitions, the order is same as result of
+ * <method>getSourceVertices</method>
+ */
+ public List<Integer> getNumPartitions() {
+ if (this.numPartitions == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(Ints.asList(this.numPartitions));
+ }
+
+ public boolean getIsPartitioned() {
+ return isPartitioned;
+ }
+
+ public CartesianProductFilterDescriptor getFilterDescriptor() {
+ return this.filterDescriptor;
+ }
+
+ public UserPayload toUserPayload(TezConfiguration conf) throws IOException {
+ return UserPayload.create(ByteBuffer.wrap(toProto(conf).toByteArray()));
+ }
+
+ protected CartesianProductConfigProto toProto(TezConfiguration conf) {
+ CartesianProductConfigProto.Builder builder =
+ CartesianProductConfigProto.newBuilder();
+ builder.setIsPartitioned(this.isPartitioned)
+ .addAllSourceVertices(Arrays.asList(sourceVertices));
+
+ if (isPartitioned) {
+ builder.addAllNumPartitions(Ints.asList(numPartitions));
+ if (filterDescriptor != null) {
+ builder.setFilterClassName(filterDescriptor.getClassName());
+ UserPayload filterUesrPayload = filterDescriptor.getUserPayload();
+ if (filterUesrPayload != null) {
+ builder.setFilterUserPayload(ByteString.copyFrom(filterUesrPayload.getPayload()));
+ }
+ }
+ }
+
+ builder.setMinFraction(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT);
+ builder.setMaxFraction(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT);
+
+ if (conf != null) {
+ builder.setMinFraction(conf.getFloat(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION,
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT));
+ builder.setMaxFraction(conf.getFloat(
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION,
+ CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT));
+ }
+ Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
+ "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" +
+ builder.getMaxFraction() + ") in cartesian product slow start");
+
+ return builder.build();
+ }
+
+ protected static CartesianProductConfigProto userPayloadToProto(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ Preconditions.checkArgument(payload != null, "UserPayload is null");
+ Preconditions.checkArgument(payload.getPayload() != null, "UserPayload carreis null payload");
+ return
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+ }
+
+ protected static CartesianProductConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ return fromProto(userPayloadToProto(payload));
+ }
+
+ protected static CartesianProductConfig fromProto(
+ CartesianProductConfigProto proto) {
+ if (!proto.getIsPartitioned()) {
+ return new CartesianProductConfig(proto.getSourceVerticesList());
+ } else {
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ CartesianProductFilterDescriptor filterDescriptor = null;
+ if (proto.hasFilterClassName()) {
+ filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ }
+ return new CartesianProductConfig(Ints.toArray(proto.getNumPartitionsList()),
+ sourceVertices, filterDescriptor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
new file mode 100644
index 0000000..96cce94
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.TezException;
+
+import javax.annotation.Nullable;
+
+/**
+ * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand {
+ private CartesianProductEdgeManagerReal edgeManagerReal;
+
+ public CartesianProductEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ Preconditions.checkArgument(getContext().getUserPayload() != null);
+ CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(
+ getContext().getUserPayload());
+ // no need to check config because config comes from VM and is already checked by VM
+ edgeManagerReal = config.getIsPartitioned()
+ ? new CartesianProductEdgeManagerPartitioned(getContext())
+ : new CartesianProductEdgeManagerUnpartitioned(getContext());
+ edgeManagerReal.initialize(config);
+ }
+
+ @VisibleForTesting
+ protected CartesianProductEdgeManagerReal getEdgeManagerReal() {
+ return this.edgeManagerReal;
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ edgeManagerReal.prepareForRouting();
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return edgeManagerReal.routeInputErrorEventToSource(destTaskId, failedInputId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+ int srcOutputId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeDataMovementEventToDestination(srcTaskId, srcOutputId, destTaskId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId);
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return edgeManagerReal.routeInputSourceTaskFailedEventToDestination(srcTaskId, destTaskId);
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return edgeManagerReal.getNumDestinationTaskPhysicalInputs(destTaskId);
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return edgeManagerReal.getNumSourceTaskPhysicalOutputs(srcTaskId);
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return edgeManagerReal.getNumDestinationConsumerTasks(sourceTaskIndex);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
new file mode 100644
index 0000000..d48a0bb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+
+class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
+ private final int[] numTasks;
+
+ protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
+ int[] numPartitions, int[] numTasks,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+ this.numTasks = numTasks;
+ }
+
+ public int[] getNumTasks() {
+ return this.numTasks;
+ }
+
+ public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ CartesianProductConfigProto proto =
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+ boolean isPartitioned = proto.getIsPartitioned();
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ int[] numPartitions =
+ proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+ CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+ ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ int[] numTasks =
+ proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
+ return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
+ numTasks, filterDescriptor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
new file mode 100644
index 0000000..644d5af
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+import org.apache.tez.dag.api.UserPayload;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal {
+ private int positionId;
+ private CartesianProductFilter filter;
+ private int[] taskIdMapping;
+ private CartesianProductEdgeManagerConfig config;
+ private int[] numPartitions;
+
+ public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize(CartesianProductEdgeManagerConfig config) throws Exception {
+ this.config = config;
+ this.numPartitions = Ints.toArray(config.getNumPartitions());
+ positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+ CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+ if (filterDescriptor != null) {
+ filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+ new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()});
+ }
+ generateTaskIdMapping();
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return failedInputId;
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+ int destTaskId) throws Exception {
+ int partition = CartesianProductCombination.fromTaskId(numPartitions,
+ getIdealTaskId(destTaskId)).getCombination().get(positionId);
+ return srcOutputId != partition ? null :
+ EventRouteMetadata.create(1, new int[]{srcTaskId});
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int partition = CartesianProductCombination.fromTaskId(numPartitions,
+ getIdealTaskId(destTaskId)).getCombination().get(positionId);
+ return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition});
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ return EventRouteMetadata.create(1, new int[]{srcTaskId});
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return getContext().getSourceVertexNumTasks();
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return numPartitions[positionId];
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return getContext().getDestinationVertexNumTasks();
+ }
+
+ private void generateTaskIdMapping() {
+ List<Integer> idealTaskId = new ArrayList<>();
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+ CartesianProductCombination combination =
+ new CartesianProductCombination(numPartitions);
+ combination.firstTask();
+ List<String> sourceVertices = config.getSourceVertices();
+ do {
+ for (int i = 0; i < sourceVertices.size(); i++) {
+ vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+ }
+ if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+ idealTaskId.add(combination.getTaskId());
+ }
+ } while (combination.nextTask());
+ this.taskIdMapping = Ints.toArray(idealTaskId);
+ }
+
+ private int getIdealTaskId(int realTaskId) {
+ return taskIdMapping[realTaskId];
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
new file mode 100644
index 0000000..705db05
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
+
+/**
+ * base class of cartesian product edge manager implementation
+ */
+abstract class CartesianProductEdgeManagerReal {
+ private final EdgeManagerPluginContext context;
+
+ public CartesianProductEdgeManagerReal(EdgeManagerPluginContext context) {
+ this.context = context;
+ }
+
+ public EdgeManagerPluginContext getContext() {
+ return this.context;
+ }
+
+ public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception;
+
+ public void prepareForRouting() throws Exception {}
+
+ public abstract int routeInputErrorEventToSource(int destTaskId, int failedInputId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId,
+ int srcOutputId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception;
+
+ public abstract int getNumDestinationTaskPhysicalInputs(int destTaskId);
+
+ public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskId);
+
+ public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
new file mode 100644
index 0000000..cea4142
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+import static org.apache.tez.dag.api.EdgeManagerPluginOnDemand.*;
+
+class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
+ private int positionId;
+ private int[] numTasks;
+ private int numDestinationConsumerTasks;
+
+ public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ public void initialize(CartesianProductEdgeManagerConfig config) {
+ positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
+ this.numTasks = config.getNumTasks();
+
+ if (numTasks != null && numTasks[positionId] != 0) {
+ numDestinationConsumerTasks = 1;
+ for (int numTask : numTasks) {
+ numDestinationConsumerTasks *= numTask;
+ }
+ numDestinationConsumerTasks /= numTasks[positionId];
+ }
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
+ return
+ CartesianProductCombination.fromTaskId(numTasks, destTaskId).getCombination().get(positionId);
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
+ int destTaskId) throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null;
+ }
+
+ @Nullable
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
+ int destTaskId)
+ throws Exception {
+ int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId)
+ .getCombination().get(positionId);
+ return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null;
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int srcTaskId) {
+ return 1;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return numDestinationConsumerTasks;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
new file mode 100644
index 0000000..5b6456e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.UserPayload;
+
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * User can extend this base class and override <method>isValidCombination</method> to implement
+ * custom filter
+ */
+@Evolving
+public abstract class CartesianProductFilter {
+ private UserPayload userPayload;
+
+ public CartesianProductFilter(UserPayload payload) {
+ this.userPayload = payload;
+ }
+
+ /**
+ * @param vertexPartitionMap the map from vertex name to partition id
+ * @return whether this combination of partitions is valid
+ */
+ public abstract boolean isValidCombination(Map<String, Integer> vertexPartitionMap);
+
+ public UserPayload getUserPayload() {
+ return userPayload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
new file mode 100644
index 0000000..bc81755
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.EntityDescriptor;
+
+public class CartesianProductFilterDescriptor
+ extends EntityDescriptor<CartesianProductFilterDescriptor> {
+
+ public CartesianProductFilterDescriptor(String filterClassName) {
+ super(filterClassName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
new file mode 100644
index 0000000..659d3b7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or
+ * unpartitioned implementation according to the config. All method invocations are actually
+ * redirected to real implementation.
+ */
+public class CartesianProductVertexManager extends VertexManagerPlugin {
+ public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION =
+ "tez.cartesian-product.min-src-fraction";
+ public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT = 0.25f;
+ public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION =
+ "tez.cartesian-product.min-src-fraction";
+ public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f;
+
+ private CartesianProductVertexManagerReal vertexManagerReal = null;
+
+ public CartesianProductVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ CartesianProductVertexManagerConfig config =
+ CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload());
+ // check whether DAG and config are is consistent
+ Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
+ Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
+ Set<String> sourceVerticesConfig = new HashSet<>();
+ sourceVerticesConfig.addAll(config.getSourceVertices());
+
+ for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
+ if (entry.getValue().getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName())) {
+ Preconditions.checkArgument(sourceVerticesDAG.contains(entry.getKey()),
+ entry.getKey() + " has CartesianProductEdgeManager but isn't in " +
+ "CartesianProductVertexManagerConfig");
+ } else {
+ Preconditions.checkArgument(!sourceVerticesDAG.contains(entry.getKey()),
+ entry.getKey() + " has no CartesianProductEdgeManager but is in " +
+ "CartesianProductVertexManagerConfig");
+ }
+ }
+
+ for (String vertex : sourceVerticesConfig) {
+ Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
+ vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+ Preconditions.checkArgument(
+ edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName()),
+ vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+ "CartesianProductEdgeManager");
+ }
+
+ vertexManagerReal = config.getIsPartitioned()
+ ? new CartesianProductVertexManagerPartitioned(getContext())
+ : new CartesianProductVertexManagerUnpartitioned(getContext());
+ vertexManagerReal.initialize(config);
+ }
+
+ @VisibleForTesting
+ protected CartesianProductVertexManagerReal getVertexManagerReal() {
+ return this.vertexManagerReal;
+ }
+
+ /**
+ * no op currently, will be used for locality based optimization in future
+ * @param vmEvent
+ * @throws Exception
+ */
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+
+ vertexManagerReal.onVertexManagerEventReceived(vmEvent);
+ }
+
+ /**
+ * Currently direct input to cartesian product vertex is not supported
+ * @param inputName
+ * @param inputDescriptor
+ * @param events
+ * @throws Exception
+ */
+ @Override
+ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+ List<Event> events) throws Exception {
+ throw new TezException("Direct input to cartesian product vertex is not supported yet");
+ }
+
+ @Override
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+ vertexManagerReal.onVertexStarted(completions);
+ }
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception{
+ vertexManagerReal.onVertexStateUpdated(stateUpdate);
+ }
+
+ @Override
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ vertexManagerReal.onSourceTaskCompleted(attempt);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
new file mode 100644
index 0000000..b324524
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tez.dag.api.UserPayload;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
+
+class CartesianProductVertexManagerConfig extends CartesianProductConfig {
+ private final float minFraction;
+ private final float maxFraction;
+
+ public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+ int[] numPartitions,
+ float minFraction, float maxFraction,
+ CartesianProductFilterDescriptor filterDescriptor) {
+ super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+ Preconditions.checkArgument(minFraction <= maxFraction,
+ "min fraction(" + minFraction + ") should be less than max fraction(" +
+ maxFraction + ") in cartesian product slow start");
+ this.minFraction = minFraction;
+ this.maxFraction = maxFraction;
+ }
+
+ public float getMinFraction() {
+ return minFraction;
+ }
+
+ public float getMaxFraction() {
+ return maxFraction;
+ }
+
+ public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
+ throws InvalidProtocolBufferException {
+ CartesianProductConfigProto proto =
+ CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
+
+ boolean isPartitioned = proto.getIsPartitioned();
+ String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
+ proto.getSourceVerticesList().toArray(sourceVertices);
+ int[] numPartitions =
+ proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
+ CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
+ ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null;
+ if (proto.hasFilterUserPayload()) {
+ filterDescriptor.setUserPayload(
+ UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
+ }
+ float minFraction = proto.getMinFraction();
+ float maxFraction = proto.getMaxFraction();
+ return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+ minFraction, maxFraction, filterDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
new file mode 100644
index 0000000..af2abae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Starts scheduling tasks when number of completed source tasks crosses
+ * min fraction and schedules all task when max fraction is reached
+ */
+class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal {
+ private CartesianProductVertexManagerConfig config;
+ private List<String> sourceVertices;
+ private int parallelism = 0;
+ private boolean vertexStarted = false;
+ private boolean vertexReconfigured = false;
+ private int numSourceVertexConfigured = 0;
+ private CartesianProductFilter filter;
+ private Map<String, BitSet> sourceTaskCompleted = new HashMap<>();
+ private int numFinishedSrcTasks = 0;
+ private int totalNumSrcTasks = 0;
+ private int lastScheduledTaskId = -1;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CartesianProductVertexManagerPartitioned.class);
+
+ public CartesianProductVertexManagerPartitioned(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException {
+ this.config = config;
+ this.sourceVertices = config.getSourceVertices();
+ CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor();
+ if (filterDescriptor != null) {
+ try {
+ filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(),
+ new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()});
+ } catch (TezReflectionException e) {
+ LOG.error("Creating filter failed");
+ throw e;
+ }
+ }
+ for (String sourceVertex : sourceVertices) {
+ sourceTaskCompleted.put(sourceVertex, new BitSet());
+ }
+ for (String vertex : sourceVertices) {
+ getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+ }
+ getContext().vertexReconfigurationPlanned();
+ }
+
+ private void reconfigureVertex() throws IOException {
+ // try all combinations, check against filter and get final parallelism
+ Map<String, Integer> vertexPartitionMap = new HashMap<>();
+
+ CartesianProductCombination combination =
+ new CartesianProductCombination(Ints.toArray(config.getNumPartitions()));
+ combination.firstTask();
+ do {
+ for (int i = 0; i < sourceVertices.size(); i++) {
+ vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+ }
+ if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
+ parallelism++;
+ }
+ } while (combination.nextTask());
+ // no need to reconfigure EM because EM already has all necessary information via config object
+ getContext().reconfigureVertex(parallelism, null, null);
+ vertexReconfigured = true;
+ getContext().doneReconfiguringVertex();
+ }
+
+ @Override
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions)
+ throws Exception {
+ vertexStarted = true;
+ if (completions != null) {
+ for (TaskAttemptIdentifier attempt : completions) {
+ onSourceTaskCompleted(attempt);
+ }
+ }
+ // try schedule because there may be no more vertex state update and source completions
+ tryScheduleTask();
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException{
+ Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED);
+ if (!vertexReconfigured) {
+ reconfigureVertex();
+ }
+ numSourceVertexConfigured++;
+ totalNumSrcTasks += getContext().getVertexNumTasks(stateUpdate.getVertexName());
+ // try schedule because there may be no more vertex start and source completions
+ tryScheduleTask();
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+ int taskId = attempt.getTaskIdentifier().getIdentifier();
+ String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ BitSet bitSet = this.sourceTaskCompleted.get(vertex);
+ if (!bitSet.get(taskId)) {
+ bitSet.set(taskId);
+ numFinishedSrcTasks++;
+ tryScheduleTask();
+ }
+ }
+
+ /**
+ * schedule task as the ascending order of id. Slow start has same behavior as ShuffleVertexManager
+ */
+ private void tryScheduleTask() {
+ // only schedule task when vertex is already started and all source vertices are configured
+ if (!vertexStarted
+ || numSourceVertexConfigured != sourceVertices.size()) {
+ return;
+ }
+ // determine the destination task with largest id to schedule
+ float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
+ int numTaskToSchedule;
+ if (percentFinishedSrcTask < config.getMinFraction()) {
+ numTaskToSchedule = 0;
+ } else if (config.getMinFraction() <= percentFinishedSrcTask &&
+ percentFinishedSrcTask <= config.getMaxFraction()) {
+ numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
+ /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+ } else {
+ numTaskToSchedule = parallelism;
+ }
+ // schedule tasks if there are more we can schedule
+ if (numTaskToSchedule-1 > lastScheduledTaskId) {
+ List<ScheduleTaskRequest> scheduleTaskRequests = new ArrayList<>();
+ for (int i = lastScheduledTaskId + 1; i < numTaskToSchedule; i++) {
+ scheduleTaskRequests.add(ScheduleTaskRequest.create(i, null));
+ }
+ lastScheduledTaskId = numTaskToSchedule-1;
+ getContext().scheduleTasks(scheduleTaskRequests);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
new file mode 100644
index 0000000..84e65ac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.cartesianproduct;
+
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import java.util.List;
+
+/**
+ * base class of cartesian product vertex manager implementation
+ */
+abstract class CartesianProductVertexManagerReal {
+ private final VertexManagerPluginContext context;
+
+ public CartesianProductVertexManagerReal(VertexManagerPluginContext context) {
+ this.context = context;
+ }
+
+ public final VertexManagerPluginContext getContext() {
+ return this.context;
+ }
+
+ public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception;
+
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
+
+ public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception;
+
+ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+
+ public abstract void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception;
+}