You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/04/10 18:20:15 UTC
[2/2] tez git commit: TEZ-3654. Make CartesianProduct edge work with
GroupInputEdge (zhiyuany)
TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f355a050
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f355a050
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f355a050
Branch: refs/heads/master
Commit: f355a050cf012854d1f7145d05f1981a3cf97e67
Parents: e0ee28a
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Apr 10 11:18:33 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Apr 10 11:18:33 2017 -0700
----------------------------------------------------------------------
.../tez/dag/api/EdgeManagerPluginContext.java | 4 +
.../tez/dag/api/VertexManagerPluginContext.java | 9 +
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 12 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 4 +-
.../tez/dag/app/dag/impl/VertexManager.java | 14 +
.../apache/tez/dag/app/dag/impl/TestEdge.java | 32 ++
.../tez/dag/app/dag/impl/TestVertexImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestVertexManager.java | 26 +
.../CartesianProductCombination.java | 78 ++-
.../CartesianProductConfig.java | 60 +-
.../CartesianProductEdgeManagerConfig.java | 39 +-
.../CartesianProductEdgeManagerPartitioned.java | 8 +-
.../CartesianProductEdgeManagerReal.java | 1 -
...artesianProductEdgeManagerUnpartitioned.java | 57 +-
.../CartesianProductVertexManager.java | 51 +-
.../CartesianProductVertexManagerConfig.java | 40 +-
...artesianProductVertexManagerPartitioned.java | 10 +-
...tesianProductVertexManagerUnpartitioned.java | 316 +++++++----
.../main/proto/CartesianProductPayload.proto | 9 +-
.../TestShuffleVertexManagerUtils.java | 5 +
.../TestCartesianProductCombination.java | 30 +-
.../TestCartesianProductConfig.java | 6 +-
.../TestCartesianProductEdgeManager.java | 6 +-
.../TestCartesianProductEdgeManagerConfig.java | 15 +-
...tCartesianProductEdgeManagerPartitioned.java | 13 +-
...artesianProductEdgeManagerUnpartitioned.java | 408 +++++++------
.../TestCartesianProductVertexManager.java | 19 +
...TestCartesianProductVertexManagerConfig.java | 8 +-
...tesianProductVertexManagerUnpartitioned.java | 565 +++++++++++--------
30 files changed, 1099 insertions(+), 754 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
index 79f685d..ef6925b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
@@ -58,4 +58,8 @@ public interface EdgeManagerPluginContext {
*/
public int getDestinationVertexNumTasks();
+ /**
+ * @return the name of vertex group that source vertex belongs to, or null
+ */
+ String getVertexGroupName();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index aa99745..b858a65 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -350,4 +350,13 @@ public interface VertexManagerPluginContext {
// TODO must be done later after TEZ-1714
//public void vertexManagerDone();
+ /**
+ * Get input vertex groups of this vertex, including vertex group name and
+ * all members vertex name
+ *
+ * @return map whose key is vertex group name and value is list of members' name,
+ * or empty map if there is no input vertex group.
+ */
+ Map<String, List<String>> getInputVertexGroups();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 1b3b39c..51847d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -149,7 +149,7 @@ public interface Vertex extends Comparable<Vertex> {
List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException;
List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException;
- List<GroupInputSpec> getGroupInputSpecList(int taskIndex);
+ List<GroupInputSpec> getGroupInputSpecList();
void addSharedOutputs(Set<String> outputs);
Set<String> getSharedOutputs();
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 690df63..f78c9a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -28,6 +28,7 @@ import java.util.zip.Deflater;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,6 +130,17 @@ public class Edge {
return destinationVertex.getTotalTasks();
}
+ @Override
+ public String getVertexGroupName() {
+ if (destinationVertex.getGroupInputSpecList() != null) {
+ for (GroupInputSpec group : destinationVertex.getGroupInputSpecList()) {
+ if (group.getGroupVertices().contains(getSourceVertexName())) {
+ return group.getGroupName();
+ }
+ }
+ }
+ return null;
+ }
}
private EdgeProperty edgeProperty;
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1ab3da8..ab17fe4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1581,7 +1581,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return TaskSpec.createBaseTaskSpec(getDAG().getName(),
getName(), getTotalTasks(), getProcessorDescriptor(),
getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
- getGroupInputSpecList(taskIndex), vertexOnlyConf);
+ getGroupInputSpecList(), vertexOnlyConf);
}
@Override
@@ -4422,7 +4422,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
- public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+ public List<GroupInputSpec> getGroupInputSpecList() {
readLock.lock();
try {
return groupInputSpecList;
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 45f72bd..b7d3428 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,6 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +36,7 @@ import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -360,6 +362,18 @@ public class VertexManager {
}
@Override
+ public Map<String, List<String>> getInputVertexGroups() {
+ checkAndThrowIfDone();
+ Map<String, List<String>> inputGroups = Maps.newHashMap();
+ if (managedVertex.getGroupInputSpecList() != null) {
+ for (GroupInputSpec group : managedVertex.getGroupInputSpecList()) {
+ inputGroups.put(group.getGroupName(), Collections.unmodifiableList(group.getGroupVertices()));
+ }
+ }
+ return inputGroups;
+ }
+
+ @Override
public void onStateUpdated(VertexStateUpdate event) {
// this is not called by the vertex manager plugin.
// no need to synchronize this. similar to other external notification methods
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index f53e505..1143395 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
@@ -34,6 +35,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -67,7 +69,9 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.test.EdgeManagerForTest;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -476,4 +480,32 @@ public class TestEdge {
return emConf.sourceTaskIndex;
}
}
+
+ @Test(timeout = 5000)
+ public void testEdgeManagerPluginCtxGetVertexGroupName() throws TezException {
+ EdgeManagerPluginDescriptor edgeManagerDescriptor =
+ EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+ EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"),
+ InputDescriptor.create("In"));
+ Edge edge = new Edge(edgeProp, null, null);
+
+ Vertex srcV = mock(Vertex.class), destV = mock(Vertex.class);
+ String srcName = "srcV", destName = "destV";
+ when(srcV.getName()).thenReturn(srcName);
+ when(destV.getName()).thenReturn(destName);
+ edge.setSourceVertex(srcV);
+ edge.setDestinationVertex(destV);
+
+ assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+ String group = "group";
+ when(destV.getGroupInputSpecList())
+ .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList("v1", "v3"), null)));
+ assertNull(edge.edgeManager.getContext().getVertexGroupName());
+
+ when(destV.getGroupInputSpecList())
+ .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(srcName, "v3"), null)));
+ assertEquals(group, edge.edgeManager.getContext().getVertexGroupName());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 90d675e..bc06fd0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5940,10 +5940,10 @@ public class TestVertexImpl {
VertexEventType.V_INIT));
dispatcher.await();
- Assert.assertNull(vA.getGroupInputSpecList(0));
- Assert.assertNull(vB.getGroupInputSpecList(0));
+ Assert.assertNull(vA.getGroupInputSpecList());
+ Assert.assertNull(vB.getGroupInputSpecList());
- List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
+ List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList();
Assert.assertEquals(1, groupInSpec.size());
Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index b93b298..6bec26e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
@@ -30,6 +32,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -55,6 +59,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Before;
import org.junit.Test;
@@ -205,6 +210,27 @@ public class TestVertexManager {
assertEquals(Sets.newHashSet("input1","input2"), edgeVertexSet);
}
+ @Test(timeout = 5000)
+ public void testVMPluginCtxGetInputVertexGroup() throws Exception {
+ VertexManager vm =
+ new VertexManager(
+ VertexManagerPluginDescriptor.create(CustomVertexManager.class
+ .getName()), UserGroupInformation.getCurrentUser(),
+ mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+
+ assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty());
+
+ String group = "group", v1 = "v1", v2 = "v2";
+ when(mockVertex.getGroupInputSpecList())
+ .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(v1, v2), null)));
+ Map<String, List<String>> groups = vm.pluginContext.getInputVertexGroups();
+ assertEquals(1, groups.size());
+ assertTrue(groups.containsKey(group));
+ assertEquals(2, groups.get(group).size());
+ assertTrue(groups.get(group).contains(v1));
+ assertTrue(groups.get(group).contains(v2));
+ }
+
public static class CustomVertexManager extends VertexManagerPlugin {
private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
index c6c95f2..97f3eb2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java
@@ -25,49 +25,47 @@ import java.util.Collections;
import java.util.List;
/**
- * Represent the combination of source partitions or tasks.
+ * Represent the combination of source chunks. A chunk is one or more source tasks or partitions.
*
- * For example, if we have two source vertices and each generates two partition, we will have 2*2=4
- * destination tasks. The mapping from source partition/task to destination task is like this:
+ * For example, if we have two source vertices and each generates two chunks, we will have 2*2=4
+ * destination tasks. The mapping from source chunks to destination task is like this:
* <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3;
*
- * Basically, it stores the source partition/task combination and can compute corresponding
+ * Basically, it stores the source chunk id combination and can compute corresponding
* destination task. It can also figure out the source combination from a given destination task.
- * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field>
- * is the helper array to computer task id, so task id = (combination) dot-product (factor)
+ * Task id is mapped in the ascending order of combinations, starting from 0.
*
* You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>,
* like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>.
*
- * Or you can also traverse all combinations that has one specific partition with
- * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>,
+ * Or you can also traverse all combinations that has one specific chunk with
+ * <method>firstTaskWithFixedChunk</method> and <method>nextTaskWithFixedChunk</method>,
* like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd
- * partition.
+ * chunk.
*/
class CartesianProductCombination {
- // numPartitions for partitioned case, numTasks for unpartitioned case
- private int[] numPartitionOrTask;
- // at which position (in source vertices array) our vertex is
+ private int[] numChunk;
+ // which position (in source vertices array) we care about
private int positionId = -1;
- // The i-th element Ci represents partition/task Ci of source vertex i.
+ // The i-th element Ci represents chunk Ci of source vertex i.
private final Integer[] combination;
- // the weight of each vertex when computing the task id
+ // helper array to computer task id: task id = (combination) dot-product (factor)
private final Integer[] factor;
- public CartesianProductCombination(int[] numPartitionOrTask) {
- Preconditions.checkArgument(!Ints.contains(numPartitionOrTask, 0),
- "CartesianProductCombination doesn't allow zero partition or task");
- this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length);
- combination = new Integer[numPartitionOrTask.length];
- factor = new Integer[numPartitionOrTask.length];
+ public CartesianProductCombination(int[] numChunk) {
+ Preconditions.checkArgument(!Ints.contains(numChunk, 0),
+ "CartesianProductCombination doesn't allow zero chunk");
+ this.numChunk = Arrays.copyOf(numChunk, numChunk.length);
+ combination = new Integer[numChunk.length];
+ factor = new Integer[numChunk.length];
factor[factor.length-1] = 1;
for (int i = combination.length-2; i >= 0; i--) {
- factor[i] = factor[i+1]*numPartitionOrTask[i+1];
+ factor[i] = factor[i+1]* numChunk[i+1];
}
}
- public CartesianProductCombination(int[] numPartitionOrTask, int positionId) {
- this(numPartitionOrTask);
+ public CartesianProductCombination(int[] numChunk, int positionId) {
+ this(numChunk);
this.positionId = positionId;
}
@@ -79,24 +77,24 @@ class CartesianProductCombination {
}
/**
- * first combination with given partition id in current position
- * @param partition
+ * first combination with given chunk id in current position
+ * @param chunkId
*/
- public void firstTaskWithFixedPartition(int partition) {
+ public void firstTaskWithFixedChunk(int chunkId) {
Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
Arrays.fill(combination, 0);
- combination[positionId] = partition;
+ combination[positionId] = chunkId;
}
/**
- * next combination without current partition in current position
+ * next combination without current chunk in current position
* @return false if there is no next combination
*/
- public boolean nextTaskWithFixedPartition() {
+ public boolean nextTaskWithFixedChunk() {
Preconditions.checkArgument(positionId >= 0 && positionId < combination.length);
int i;
for (i = combination.length-1; i >= 0; i--) {
- if (i != positionId && combination[i] != numPartitionOrTask[i]-1) {
+ if (i != positionId && combination[i] != numChunk[i]-1) {
break;
}
}
@@ -117,20 +115,20 @@ class CartesianProductCombination {
}
/**
- * first combination with given partition id in current position
+ * first combination with given chunk id in current position
*/
public void firstTask() {
Arrays.fill(combination, 0);
}
/**
- * next combination without current partition in current position
+ * next combination without current chunk in current position
* @return false if there is no next combination
*/
public boolean nextTask() {
int i;
for (i = combination.length-1; i >= 0; i--) {
- if (combination[i] != numPartitionOrTask[i]-1) {
+ if (combination[i] != numChunk[i]-1) {
break;
}
}
@@ -145,19 +143,19 @@ class CartesianProductCombination {
}
/**
- * @return corresponding task id for current combination
+ * @return corresponding chunk id for current combination
*/
- public int getTaskId() {
- int taskId = 0;
+ public int getChunkId() {
+ int chunkId = 0;
for (int i = 0; i < combination.length; i++) {
- taskId += combination[i]*factor[i];
+ chunkId += combination[i]*factor[i];
}
- return taskId;
+ return chunkId;
}
- public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
+ public static CartesianProductCombination fromTaskId(int[] numChunk,
int taskId) {
- CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask);
+ CartesianProductCombination result = new CartesianProductCombination(numChunk);
for (int i = 0; i < result.combination.length; i++) {
result.combination[i] = taskId/result.factor[i];
taskId %= result.factor[i];
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
index b57ed84..12a17cb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java
@@ -46,21 +46,23 @@ import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUs
@Evolving
public class CartesianProductConfig {
private final boolean isPartitioned;
- private final String[] sourceVertices;
+ private final String[] sources;
+ // numPartition[i] means how many partitions sourceVertices[i] will generate
+ // (not used in unpartitioned case)
private final int[] numPartitions;
private final CartesianProductFilterDescriptor filterDescriptor;
/**
* create config for unpartitioned case
- * @param sourceVertices list of source vertices names
+ * @param sources list of names of source vertices or vertex groups
*/
- public CartesianProductConfig(List<String> sourceVertices) {
- Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null");
- Preconditions.checkArgument(sourceVertices.size() > 1,
- "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size());
+ public CartesianProductConfig(List<String> sources) {
+ Preconditions.checkArgument(sources != null, "source list cannot be null");
+ Preconditions.checkArgument(sources.size() > 1,
+ "there must be more than 1 source " + "67, currently only " + sources.size());
this.isPartitioned = false;
- this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]);
+ this.sources = sources.toArray(new String[sources.size()]);
this.numPartitions = null;
this.filterDescriptor = null;
}
@@ -86,12 +88,12 @@ public class CartesianProductConfig {
this.isPartitioned = true;
this.numPartitions = new int[vertexPartitionMap.size()];
- this.sourceVertices = new String[vertexPartitionMap.size()];
+ this.sources = new String[vertexPartitionMap.size()];
this.filterDescriptor = filterDescriptor;
int i = 0;
for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) {
- this.sourceVertices[i] = entry.getKey();
+ this.sources[i] = entry.getKey();
this.numPartitions[i] = entry.getValue();
i++;
}
@@ -102,23 +104,23 @@ public class CartesianProductConfig {
/**
* create config for partitioned case, with specified source vertices order
* @param numPartitions
- * @param sourceVertices
+ * @param sources
* @param filterDescriptor
*/
@VisibleForTesting
- protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices,
+ protected CartesianProductConfig(int[] numPartitions, String[] sources,
CartesianProductFilterDescriptor filterDescriptor) {
Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null");
- Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null");
- Preconditions.checkArgument(numPartitions.length == sourceVertices.length,
- "partitions count array(length: " + numPartitions.length + ") and source vertices array " +
- "(length: " + sourceVertices.length + ") cannot have different length");
- Preconditions.checkArgument(sourceVertices.length > 1,
- "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length);
+ Preconditions.checkArgument(sources != null, "source array can't be null");
+ Preconditions.checkArgument(numPartitions.length == sources.length,
+ "partitions count array(length: " + numPartitions.length + ") and source array " +
+ "(length: " + sources.length + ") cannot have different length");
+ Preconditions.checkArgument(sources.length > 1,
+ "there must be more than 1 source " + ", currently only " + sources.length);
this.isPartitioned = true;
this.numPartitions = numPartitions;
- this.sourceVertices = sourceVertices;
+ this.sources = sources;
this.filterDescriptor = filterDescriptor;
checkNumPartitions();
@@ -128,11 +130,11 @@ public class CartesianProductConfig {
* create config for both cases, used by subclass
*/
protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions,
- String[] sourceVertices,
+ String[] sources,
CartesianProductFilterDescriptor filterDescriptor) {
this.isPartitioned = isPartitioned;
this.numPartitions = numPartitions;
- this.sourceVertices = sourceVertices;
+ this.sources = sources;
this.filterDescriptor = filterDescriptor;
}
@@ -142,11 +144,11 @@ public class CartesianProductConfig {
boolean isUnpartitioned = true;
for (int i = 0; i < numPartitions.length; i++) {
Preconditions.checkArgument(this.numPartitions[i] > 0,
- "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions");
+ "Vertex " + sources[i] + "has negative (" + numPartitions[i] + ") partitions");
isUnpartitioned = isUnpartitioned && numPartitions[i] == 1;
}
Preconditions.checkArgument(!isUnpartitioned,
- "every source vertex has 1 partition in a partitioned case");
+ "every source has 1 partition in a partitioned case");
} else {
Preconditions.checkArgument(this.numPartitions == null,
"partition counts should be null in unpartitioned case");
@@ -154,10 +156,10 @@ public class CartesianProductConfig {
}
/**
- * @return the array of source vertices names
+ * @return the array of source vertices (or source vertex group) names
*/
public List<String> getSourceVertices() {
- return Collections.unmodifiableList(Arrays.asList(sourceVertices));
+ return Collections.unmodifiableList(Arrays.asList(sources));
}
/**
@@ -187,7 +189,7 @@ public class CartesianProductConfig {
CartesianProductConfigProto.Builder builder =
CartesianProductConfigProto.newBuilder();
builder.setIsPartitioned(this.isPartitioned)
- .addAllSourceVertices(Arrays.asList(sourceVertices));
+ .addAllSources(Arrays.asList(sources));
if (isPartitioned) {
builder.addAllNumPartitions(Ints.asList(numPartitions));
@@ -220,7 +222,7 @@ public class CartesianProductConfig {
String desiredBytesPerGroup =
conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP);
if (desiredBytesPerGroup != null) {
- builder.setDesiredBytesPerGroup(Long.parseLong(desiredBytesPerGroup));
+ builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup));
}
}
Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(),
@@ -246,10 +248,10 @@ public class CartesianProductConfig {
protected static CartesianProductConfig fromProto(
CartesianProductConfigProto proto) {
if (!proto.getIsPartitioned()) {
- return new CartesianProductConfig(proto.getSourceVerticesList());
+ return new CartesianProductConfig(proto.getSourcesList());
} else {
- String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
- proto.getSourceVerticesList().toArray(sourceVertices);
+ String[] sourceVertices = new String[proto.getSourcesList().size()];
+ proto.getSourcesList().toArray(sourceVertices);
CartesianProductFilterDescriptor filterDescriptor = null;
if (proto.hasFilterClassName()) {
filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName());
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
index 0347f67..df0bcfa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java
@@ -27,23 +27,18 @@ import java.nio.ByteBuffer;
import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
- private final int[] numTasks;
- private final int[] numGroups;
+ final int[] numChunksPerSrc;
+ final int numChunk;
+ final int chunkIdOffset;
protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices,
- int[] numPartitions, int[] numTasks, int[] numGroups,
- CartesianProductFilterDescriptor filterDescriptor) {
+ int[] numPartitions, int[] numChunksPerSrc, int numChunk,
+ int chunkIdOffset,
+ CartesianProductFilterDescriptor filterDescriptor) {
super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
- this.numTasks = numTasks;
- this.numGroups = numGroups;
- }
-
- public int[] getNumTasks() {
- return this.numTasks;
- }
-
- public int[] getNumGroups() {
- return this.numGroups;
+ this.numChunksPerSrc = numChunksPerSrc;
+ this.numChunk = numChunk;
+ this.chunkIdOffset = chunkIdOffset;
}
public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload)
@@ -52,8 +47,8 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
boolean isPartitioned = proto.getIsPartitioned();
- String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
- proto.getSourceVerticesList().toArray(sourceVertices);
+ String[] sources = new String[proto.getSourcesList().size()];
+ proto.getSourcesList().toArray(sources);
int[] numPartitions =
proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -62,11 +57,11 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig {
filterDescriptor.setUserPayload(
UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray())));
}
- int[] numTasks =
- proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList());
- int[] numGroups =
- proto.getNumGroupsCount() == 0 ? null : Ints.toArray(proto.getNumGroupsList());
- return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions,
- numTasks, numGroups, filterDescriptor);
+ int[] humChunksPerSrc =
+ proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList());
+ int numChunk = proto.getNumChunk();
+ int chunkIdOffset = proto.getChunkIdOffset();
+ return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions,
+ humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
index 068da81..5ece5cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java
@@ -107,13 +107,13 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager
CartesianProductCombination combination =
new CartesianProductCombination(numPartitions);
combination.firstTask();
- List<String> sourceVertices = config.getSourceVertices();
+ List<String> sources = config.getSourceVertices();
do {
- for (int i = 0; i < sourceVertices.size(); i++) {
- vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i));
+ for (int i = 0; i < sources.size(); i++) {
+ vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i));
}
if (filter == null || filter.isValidCombination(vertexPartitionMap)) {
- idealTaskId.add(combination.getTaskId());
+ idealTaskId.add(combination.getChunkId());
}
} while (combination.nextTask());
this.taskIdMapping = Ints.toArray(idealTaskId);
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
index 3e1407c..f22035b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java
@@ -18,7 +18,6 @@
package org.apache.tez.runtime.library.cartesianproduct;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata;
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
index b9cb155..80d7dc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java
@@ -27,7 +27,9 @@ import javax.annotation.Nullable;
class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal {
private int positionId;
- private int[] numGroups;
+ private int numChunk;
+ private int chunkIdOffset;
+ private int[] numChunkPerSrc;
private int numDestinationConsumerTasks;
private Grouper grouper = new Grouper();
@@ -36,32 +38,38 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
}
public void initialize(CartesianProductEdgeManagerConfig config) {
- positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName());
- this.numGroups = config.getNumGroups();
+ String groupName = getContext().getVertexGroupName();
+ String srcName = groupName != null ? groupName : getContext().getSourceVertexName();
+ this.positionId = config.getSourceVertices().indexOf(srcName);
+ this.numChunkPerSrc = config.numChunksPerSrc;
+ this.numChunk = config.numChunk;
+ this.chunkIdOffset = config.chunkIdOffset;
- if (numGroups != null && numGroups[positionId] != 0) {
- grouper.init(config.getNumTasks()[positionId], numGroups[positionId]);
+ if (numChunk != 0) {
+ grouper.init(getContext().getSourceVertexNumTasks(), numChunk);
numDestinationConsumerTasks = 1;
- for (int numGroup : numGroups) {
+ for (int numGroup : numChunkPerSrc) {
numDestinationConsumerTasks *= numGroup;
}
- numDestinationConsumerTasks /= numGroups[positionId];
+ numDestinationConsumerTasks /= numChunkPerSrc[positionId];
}
}
@Override
public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception {
return failedInputId + grouper.getFirstTaskInGroup(
- CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId));
+ CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+ - chunkIdOffset);
}
@Override
public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId,
int destTaskId) throws Exception {
- int groupId =
- CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
- if (grouper.isInGroup(srcTaskId, groupId)) {
- int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+ int chunkId =
+ CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+ - chunkIdOffset;
+ if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+ int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
return EventRouteMetadata.create(1, new int[] {idx});
}
return null;
@@ -72,10 +80,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId,
int destTaskId)
throws Exception {
- int groupId =
- CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
- if (grouper.isInGroup(srcTaskId, groupId)) {
- int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+ int chunkId =
+ CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+ - chunkIdOffset;
+ if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+ int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
return CompositeEventRouteMetadata.create(1, idx, 0);
}
return null;
@@ -86,10 +95,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId,
int destTaskId)
throws Exception {
- int groupId =
- CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
- if (grouper.isInGroup(srcTaskId, groupId)) {
- int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId);
+ int chunkId =
+ CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+ - chunkIdOffset;
+ if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) {
+ int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId);
return EventRouteMetadata.create(1, new int[] {idx});
}
return null;
@@ -97,9 +107,10 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag
@Override
public int getNumDestinationTaskPhysicalInputs(int destTaskId) {
- int groupId =
- CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId);
- return grouper.getNumTasksInGroup(groupId);
+ int chunkId =
+ CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId)
+ - chunkIdOffset;
+ return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
index a104904..857f11e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java
@@ -30,6 +30,8 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,6 +47,13 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
*
* Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be
* determined by vertex manager.
+ *
+ * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast
+ * edge.
+ *
+ * Sources can be either vertices or vertex groups (only in unpartitioned case).
+ *
+ * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case.
*/
public class CartesianProductVertexManager extends VertexManagerPlugin {
/**
@@ -95,26 +104,37 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
// check whether DAG and config are is consistent
Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties();
Set<String> sourceVerticesDAG = edgePropertyMap.keySet();
- Set<String> sourceVerticesConfig = new HashSet<>();
- sourceVerticesConfig.addAll(config.getSourceVertices());
+ Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices());
+
+ Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups();
+ Map<String, String> vertexToGroup = new HashMap<>();
+ for (Map.Entry<String, List<String>> group : vertexGroups.entrySet()) {
+ for (String vertex : group.getValue()) {
+ vertexToGroup.put(vertex, group.getKey());
+ }
+ }
for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) {
String vertex = entry.getKey();
+ String group = vertexToGroup.get(vertex);
EdgeProperty edgeProperty = entry.getValue();
EdgeManagerPluginDescriptor empDescriptor = edgeProperty.getEdgeManagerDescriptor();
if (empDescriptor != null
&& empDescriptor.getClassName().equals(CartesianProductEdgeManager.class.getName())) {
- Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+ Preconditions.checkArgument(
+ sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
vertex + " has CartesianProductEdgeManager but isn't in " +
"CartesianProductVertexManagerConfig");
} else {
- Preconditions.checkArgument(!sourceVerticesConfig.contains(vertex),
+ Preconditions.checkArgument(
+ !sourceVerticesConfig.contains(vertex) && !sourceVerticesConfig.contains(group),
vertex + " has no CartesianProductEdgeManager but is in " +
"CartesianProductVertexManagerConfig");
}
if (edgeProperty.getDataMovementType() == CUSTOM) {
- Preconditions.checkArgument(sourceVerticesConfig.contains(vertex),
+ Preconditions.checkArgument(
+ sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group),
"Only broadcast and cartesian product edges are allowed in cartesian product vertex");
} else {
Preconditions.checkArgument(edgeProperty.getDataMovementType() == BROADCAST,
@@ -122,14 +142,19 @@ public class CartesianProductVertexManager extends VertexManagerPlugin {
}
}
- for (String vertex : sourceVerticesConfig) {
- Preconditions.checkArgument(sourceVerticesDAG.contains(vertex),
- vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
- Preconditions.checkArgument(
- edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName()
- .equals(CartesianProductEdgeManager.class.getName()),
- vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
- "CartesianProductEdgeManager");
+ for (String src : sourceVerticesConfig) {
+ List<String> vertices =
+ vertexGroups.containsKey(src) ? vertexGroups.get(src) : Collections.singletonList(src);
+ for (String v : vertices) {
+ Preconditions.checkArgument(
+ sourceVerticesDAG.contains(v),
+ v + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG");
+ Preconditions.checkArgument(
+ edgePropertyMap.get(v).getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName()),
+ v + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " +
+ "CartesianProductEdgeManager");
+ }
}
vertexManagerReal = config.getIsPartitioned()
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
index f43f494..e082ec3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java
@@ -28,32 +28,24 @@ import java.nio.ByteBuffer;
import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*;
class CartesianProductVertexManagerConfig extends CartesianProductConfig {
- private final float minFraction;
- private final float maxFraction;
- private final boolean enableAutoGrouping;
- private final long desiredBytesPerGroup;
+ final float minFraction;
+ final float maxFraction;
+ final boolean enableAutoGrouping;
+ final long desiredBytesPerChunk;
- public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices,
+ public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources,
int[] numPartitions,
float minFraction, float maxFraction,
- boolean enableAutoGrouping, long desiredBytesPerGroup,
+ boolean enableAutoGrouping, long desiredBytesPerChunk,
CartesianProductFilterDescriptor filterDescriptor) {
- super(isPartitioned, numPartitions, sourceVertices, filterDescriptor);
+ super(isPartitioned, numPartitions, sources, filterDescriptor);
Preconditions.checkArgument(minFraction <= maxFraction,
"min fraction(" + minFraction + ") should be less than max fraction(" +
maxFraction + ") in cartesian product slow start");
this.minFraction = minFraction;
this.maxFraction = maxFraction;
this.enableAutoGrouping = enableAutoGrouping;
- this.desiredBytesPerGroup = desiredBytesPerGroup;
- }
-
- public float getMinFraction() {
- return minFraction;
- }
-
- public float getMaxFraction() {
- return maxFraction;
+ this.desiredBytesPerChunk = desiredBytesPerChunk;
}
public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload)
@@ -62,8 +54,8 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
boolean isPartitioned = proto.getIsPartitioned();
- String[] sourceVertices = new String[proto.getSourceVerticesList().size()];
- proto.getSourceVerticesList().toArray(sourceVertices);
+ String[] sources = new String[proto.getSourcesList().size()];
+ proto.getSourcesList().toArray(sources);
int[] numPartitions =
proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList());
CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName()
@@ -77,17 +69,9 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig {
boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping()
: CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT;
- long desiredBytesPerGroup = proto.hasDesiredBytesPerGroup() ? proto.getDesiredBytesPerGroup()
+ long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk()
: CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT;
- return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions,
+ return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions,
minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor);
}
-
- public boolean isEnableAutoGrouping() {
- return enableAutoGrouping;
- }
-
- public long getDesiredBytesPerGroup() {
- return desiredBytesPerGroup;
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
index 85c04d2..ddff37d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java
@@ -174,12 +174,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan
// determine the destination task with largest id to schedule
float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks;
int numTaskToSchedule;
- if (percentFinishedSrcTask < config.getMinFraction()) {
+ if (percentFinishedSrcTask < config.minFraction) {
numTaskToSchedule = 0;
- } else if (config.getMinFraction() <= percentFinishedSrcTask &&
- percentFinishedSrcTask <= config.getMaxFraction()) {
- numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction())
- /(config.getMaxFraction()-config.getMinFraction())*parallelism);
+ } else if (config.minFraction <= percentFinishedSrcTask &&
+ percentFinishedSrcTask <= config.maxFraction) {
+ numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction)
+ /(config.maxFraction-config.minFraction)*parallelism);
} else {
numTaskToSchedule = parallelism;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
index 993cb40..46ea76e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java
@@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.cartesianproduct;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -36,7 +35,6 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -50,31 +48,132 @@ import java.util.Set;
import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM;
import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto;
+/**
+ * In unpartitioned case, we have one destination task for each source chunk combination. A source
+ * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping)
+ * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across
+ * vertices. The mapping from source chunk to destination task id is done by
+ * {@link <CartesianProductCombination>}.
+ *
+ * If auto grouping is enabled, this vertex manager will estimate output size of each source and
+ * group source tasks of each source in chunk according to desired grouping size configured by user.
+ *
+ *
+ */
class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal {
+ /**
+ * a cartesian product source
+ */
+ static class Source {
+ // list of source vertices of this source
+ List<SrcVertex> srcVertices = new ArrayList<>();
+ // position of this source in all sources
+ int position;
+ // name of source vertex or vertex group
+ String name;
+
+ // total number of chunks in this source
+ public int getNumChunk() {
+ int numChunk = 0;
+ for (SrcVertex srcV : srcVertices) {
+ numChunk += srcV.numChunk;
+ }
+ return numChunk;
+ }
+
+ // whether this source has any task completed
+ public boolean hasTaskCompleted() {
+ for (SrcVertex srcV : srcVertices) {
+ if (!srcV.taskCompleted.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String toString(boolean afterReconfigure) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Source at position ");
+ sb.append(position);
+ if (name != null) {
+ sb.append(", ");
+ sb.append("vertex group ");
+ sb.append(name);
+
+ }
+ sb.append(": {");
+ for (SrcVertex srcV : srcVertices) {
+ sb.append("[");
+ sb.append(srcV.toString(afterReconfigure));
+ sb.append("], ");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.setCharAt(sb.length() - 1, '}');
+ return sb.toString();
+ }
+ }
+
+ /**
+ * a cartesian product source vertex
+ */
+ class SrcVertex {
+ // which source this vertex belongs to
+ Source source;
+ // vertex name
+ String name;
+ int numTask;
+ // num chunks of this source vertex
+ int numChunk;
+ // offset of chunk id in vertex group
+ // we need sequence chunks in the vertex group to make them look like from single vertex
+ int chunkIdOffset = 0;
+ RoaringBitmap taskCompleted = new RoaringBitmap();
+ RoaringBitmap taskWithVMEvent = new RoaringBitmap();
+ long outputBytes;
+
+ public void doGrouping() {
+ numChunk = numTask;
+ if (config.enableAutoGrouping) {
+ outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality();
+ numChunk = Math.min(numChunk,
+ (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk));
+ }
+ }
+
+ public String toString(boolean afterReconfigure) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("vertex ").append(name).append(", ");
+ if (afterReconfigure) {
+ sb.append("estimated output ").append(outputBytes).append(" bytes, ");
+ sb.append(numChunk).append(" chunks");
+ } else {
+ sb.append(numTask).append(" tasks, ");
+ sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
+ sb.append("output ").append(outputBytes).append(" bytes");
+ }
+ return sb.toString();
+ }
+ }
+
private static final Logger LOG =
org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class);
- List<String> sourceVertices;
- private int parallelism = 1;
+ CartesianProductVertexManagerConfig config;
+ Map<String, Source> sourcesByName = new HashMap<>();
+ Map<String, SrcVertex> srcVerticesByName = new HashMap<>();
+
private boolean vertexReconfigured = false;
private boolean vertexStarted = false;
private boolean vertexStartSchedule = false;
private int numCPSrcNotInConfigureState = 0;
private int numBroadcastSrcNotInRunningState = 0;
- private int[] numTasks;
-
private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>();
- private Map<String, RoaringBitmap> sourceTaskCompleted = new HashMap<>();
private RoaringBitmap scheduledTasks = new RoaringBitmap();
- private CartesianProductConfig config;
/* auto reduce related */
- private int[] numGroups;
+ // num of chunks of source at the corresponding position in source list
+ private int[] numChunksPerSrc;
private Set<String> vertexSentVME = new HashSet<>();
- private long[] vertexOutputBytes;
- private int[] numVertexManagerEventsReceived;
- private long desiredBytesPerGroup;
- private boolean enableGrouping;
private Grouper grouper = new Grouper();
public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) {
@@ -83,29 +182,39 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
@Override
public void initialize(CartesianProductVertexManagerConfig config) throws Exception {
- sourceVertices = config.getSourceVertices();
- numTasks = new int[sourceVertices.size()];
- numGroups = new int[sourceVertices.size()];
- vertexOutputBytes = new long[sourceVertices.size()];
- numVertexManagerEventsReceived = new int[sourceVertices.size()];
-
- enableGrouping = config.isEnableAutoGrouping();
- desiredBytesPerGroup = config.getDesiredBytesPerGroup();
-
- for (String vertex : sourceVertices) {
- sourceTaskCompleted.put(vertex, new RoaringBitmap());
- }
-
- for (String vertex : getContext().getInputVertexEdgeProperties().keySet()) {
- if (sourceVertices.indexOf(vertex) != -1) {
- sourceTaskCompleted.put(vertex, new RoaringBitmap());
- getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED));
+ for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) {
+ if (e.getValue().getDataMovementType() == CUSTOM
+ && e.getValue().getEdgeManagerDescriptor().getClassName()
+ .equals(CartesianProductEdgeManager.class.getName())) {
+ srcVerticesByName.put(e.getKey(), new SrcVertex());
+ srcVerticesByName.get(e.getKey()).name = e.getKey();
+ getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED));
numCPSrcNotInConfigureState++;
} else {
- getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.RUNNING));
+ getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING));
numBroadcastSrcNotInRunningState++;
}
}
+
+ Map<String, List<String>> srcGroups = getContext().getInputVertexGroups();
+ for (int i = 0; i < config.getSourceVertices().size(); i++) {
+ String srcName = config.getSourceVertices().get(i);
+ Source source = new Source();
+ source.position = i;
+ if (srcGroups.containsKey(srcName)) {
+ source.name = srcName;
+ for (String srcVName : srcGroups.get(srcName)) {
+ source.srcVertices.add(srcVerticesByName.get(srcVName));
+ srcVerticesByName.get(srcVName).source = source;
+ }
+ } else {
+ source.srcVertices.add(srcVerticesByName.get(srcName));
+ srcVerticesByName.get(srcName).source = source;
+ }
+ sourcesByName.put(srcName, source);
+ }
+
+ numChunksPerSrc = new int[sourcesByName.size()];
this.config = config;
getContext().vertexReconfigurationPlanned();
}
@@ -128,7 +237,7 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
VertexState state = stateUpdate.getVertexState();
if (state == VertexState.CONFIGURED) {
- numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex);
+ srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex);
numCPSrcNotInConfigureState--;
} else if (state == VertexState.RUNNING) {
numBroadcastSrcNotInRunningState--;
@@ -145,22 +254,20 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) {
int taskId = attempt.getTaskIdentifier().getIdentifier();
String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
- if (sourceVertices.indexOf(vertex) == -1) {
- return;
- }
- if (sourceTaskCompleted.get(vertex).contains(taskId)) {
- return;
+ SrcVertex srcV = srcVerticesByName.get(vertex);
+ if (srcV != null && !srcV.taskCompleted.contains(taskId)) {
+ srcV.taskCompleted.add(taskId);
+ completedSrcTaskToProcess.add(attempt);
}
- sourceTaskCompleted.get(vertex).add(taskId);
- completedSrcTaskToProcess.add(attempt);
}
private boolean tryStartSchedule() {
if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) {
return false;
}
- for (RoaringBitmap bitmap: sourceTaskCompleted.values()) {
- if (bitmap.isEmpty()) {
+
+ for (Source src : sourcesByName.values()) {
+ if (!src.hasTaskCompleted()) {
return false;
}
}
@@ -178,15 +285,17 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
if (vmEvent.getUserPayload() != null) {
String srcVertex =
vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName();
- int position = sourceVertices.indexOf(srcVertex);
+ SrcVertex srcV = srcVerticesByName.get(srcVertex);
+
// vmEvent from non-cp vertex doesn't matter
- if (position == -1) {
+ if (srcV == null) {
return;
}
+
VertexManagerEventPayloadProto proto =
VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
- vertexOutputBytes[position] += proto.getOutputSize();
- numVertexManagerEventsReceived[position]++;
+ srcV.outputBytes += proto.getOutputSize();
+ srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier());
vertexSentVME.add(srcVertex);
}
@@ -197,60 +306,65 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
if (numCPSrcNotInConfigureState > 0) {
return false;
}
- if (enableGrouping) {
- if (vertexSentVME.size() != sourceVertices.size()) {
+ if (config.enableAutoGrouping) {
+ if (vertexSentVME.size() != srcVerticesByName.size()) {
return false;
}
- for (int i = 0; i < vertexOutputBytes.length; i++) {
- if (vertexOutputBytes[i] < desiredBytesPerGroup
- && numVertexManagerEventsReceived[i] < numTasks[i]) {
+ // every src v must output at least one chunk size
+ for (SrcVertex srcV : srcVerticesByName.values()) {
+ if (srcV.outputBytes < config.desiredBytesPerChunk
+ && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
return false;
}
}
}
- LOG.info("Start reconfigure, grouping: " + enableGrouping
- + ", group size: " + desiredBytesPerGroup);
- LOG.info("src vertices: " + sourceVertices);
- LOG.info("number of source tasks in each src: " + Arrays.toString(numTasks));
- LOG.info("number of vmEvent from each src: "
- + Arrays.toString(numVertexManagerEventsReceived));
- LOG.info("output stats of each src: " + Arrays.toString(vertexOutputBytes));
-
- for (int i = 0; i < numTasks.length; i++) {
- if (enableGrouping) {
- vertexOutputBytes[i] =
- vertexOutputBytes[i] * numTasks[i] / numVertexManagerEventsReceived[i];
- int desiredNumGroup =
- (int) ((vertexOutputBytes[i] + desiredBytesPerGroup - 1) / desiredBytesPerGroup);
- numGroups[i] = Math.min(numTasks[i], desiredNumGroup);
- } else {
- numGroups[i] = numTasks[i];
+ LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping
+ + ", chunk size: " + config.desiredBytesPerChunk + " bytes.");
+ for (String srcName : config.getSourceVertices()) {
+ LOG.info(sourcesByName.get(srcName).toString(false));
+ }
+
+ for (Source src : sourcesByName.values()) {
+ for (int i = 0; i < src.srcVertices.size(); i++) {
+ src.srcVertices.get(i).doGrouping();
+ if (i > 0) {
+ src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk;
+ }
}
- parallelism *= numGroups[i];
+ numChunksPerSrc[src.position] = src.getNumChunk();
}
- LOG.info("estimated output size of each src: " + Arrays.toString(vertexOutputBytes));
- LOG.info("number of groups for each src: " + Arrays.toString(numGroups));
+ int parallelism = 1;
+ for (Source src : sourcesByName.values()) {
+ parallelism *= src.getNumChunk();
+ }
+
+ LOG.info("After reconfigure, ");
+ for (String srcName : config.getSourceVertices()) {
+ LOG.info(sourcesByName.get(srcName).toString(true));
+ }
LOG.info("Final parallelism: " + parallelism);
- UserPayload payload = null;
+ CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+ for (int i = 0; i < numChunksPerSrc.length; i++) {
+ numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk();
+ }
+ builder.setIsPartitioned(false).addAllSources(config.getSourceVertices())
+ .addAllNumChunks(Ints.asList(this.numChunksPerSrc));
+
Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties();
Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator();
while (iter.hasNext()) {
- EdgeProperty edgeProperty = iter.next().getValue();
- if (edgeProperty.getDataMovementType() != CUSTOM) {
+ Map.Entry<String, EdgeProperty> e = iter.next();
+ if (e.getValue().getDataMovementType() != CUSTOM) {
iter.remove();
- continue;
- }
- EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor();
- if (payload == null) {
- CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
- builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
- .addAllNumGroups(Ints.asList(numGroups)).addAllSourceVertices(config.getSourceVertices());
- payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
+ } else {
+ SrcVertex srcV = srcVerticesByName.get(e.getKey());
+ builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset);
+ e.getValue().getEdgeManagerDescriptor()
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())));
}
- descriptor.setUserPayload(payload);
}
getContext().reconfigureVertex(parallelism, null, edgeProperties);
vertexReconfigured = true;
@@ -267,32 +381,42 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
}
while (!completedSrcTaskToProcess.isEmpty()) {
- scheduledTasksDependOnCompletion(completedSrcTaskToProcess.poll());
+ scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll());
}
}
- private void scheduledTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
+ private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) {
int taskId = attempt.getTaskIdentifier().getIdentifier();
String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName();
- int position = sourceVertices.indexOf(vertex);
+ SrcVertex srcV = srcVerticesByName.get(vertex);
+ Source src = srcV.source;
List<ScheduleTaskRequest> requests = new ArrayList<>();
CartesianProductCombination combination =
- new CartesianProductCombination(numGroups, position);
- grouper.init(numTasks[position], numGroups[position]);
- combination.firstTaskWithFixedPartition(grouper.getGroupId(taskId));
+ new CartesianProductCombination(numChunksPerSrc, src.position);
+ grouper.init(srcV.numTask, srcV.numChunk);
+ combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset);
do {
List<Integer> list = combination.getCombination();
- if (scheduledTasks.contains(combination.getTaskId())) {
+ if (scheduledTasks.contains(combination.getChunkId())) {
continue;
}
boolean readyToSchedule = true;
for (int i = 0; i < list.size(); i++) {
- int group = list.get(i);
- grouper.init(numTasks[i], numGroups[i]);
- for (int j = grouper.getFirstTaskInGroup(group); j <= grouper.getLastTaskInGroup(group); j++) {
- if (!sourceTaskCompleted.get(sourceVertices.get(i)).contains(j)) {
+ int chunkId = list.get(i);
+ SrcVertex srcVHasGroup = null;
+ for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) {
+ if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) {
+ srcVHasGroup = v;
+ break;
+ }
+ }
+ assert srcVHasGroup != null;
+ grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk);
+ chunkId -= srcVHasGroup.chunkIdOffset;
+ for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) {
+ if (!srcVHasGroup.taskCompleted.contains(j)) {
readyToSchedule = false;
break;
}
@@ -303,10 +427,10 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM
}
if (readyToSchedule) {
- requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null));
- scheduledTasks.add(combination.getTaskId());
+ requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null));
+ scheduledTasks.add(combination.getChunkId());
}
- } while (combination.nextTaskWithFixedPartition());
+ } while (combination.nextTaskWithFixedChunk());
if (!requests.isEmpty()) {
getContext().scheduleTasks(requests);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
index dd7d06f..cb503ea 100644
--- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
+++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto
@@ -21,14 +21,15 @@ option java_outer_classname = "CartesianProductUserPayload";
message CartesianProductConfigProto {
required bool isPartitioned = 1;
- repeated string sourceVertices = 2;
+ repeated string sources = 2;
repeated int32 numPartitions = 3;
optional string filterClassName = 4;
optional bytes filterUserPayload = 5;
optional float minFraction = 6;
optional float maxFraction = 7;
optional bool enableAutoGrouping = 8;
- optional int64 desiredBytesPerGroup = 9;
- repeated int32 numTasks = 10;
- repeated int32 numGroups = 11;
+ optional int64 desiredBytesPerChunk = 9;
+ repeated int32 numChunks = 10;
+ optional int32 numChunk = 11;
+ optional int32 chunkIdOffset = 12;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
index 9a5614e..439d650 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java
@@ -331,6 +331,11 @@ public class TestShuffleVertexManagerUtils {
public int getDestinationVertexNumTasks() {
return numTasks;
}
+
+ @Override
+ public String getVertexGroupName() {
+ return null;
+ }
};
if (newEdgeManagers != null) {
EdgeManagerPlugin edgeManager = ReflectionUtils
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
index 06d3e90..3755ac8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java
@@ -30,44 +30,44 @@ import static org.junit.Assert.assertTrue;
public class TestCartesianProductCombination {
private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) {
assertArrayEquals(result, Ints.toArray(combination.getCombination()));
- assertEquals(taskId, combination.getTaskId());
+ assertEquals(taskId, combination.getChunkId());
}
private void testCombinationTwoWayVertex0() {
CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0);
- combination.firstTaskWithFixedPartition(1);
+ combination.firstTaskWithFixedChunk(1);
verifyCombination(combination, new int[]{1,0}, 3);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{1,1}, 4);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{1,2}, 5);
- assertFalse(combination.nextTaskWithFixedPartition());
+ assertFalse(combination.nextTaskWithFixedChunk());
}
private void testCombinationTwoWayVertex1() {
CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1);
- combination.firstTaskWithFixedPartition(1);
+ combination.firstTaskWithFixedChunk(1);
verifyCombination(combination, new int[]{0,1}, 1);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{1,1}, 4);
- assertFalse(combination.nextTaskWithFixedPartition());
+ assertFalse(combination.nextTaskWithFixedChunk());
}
private void testCombinationThreeWay() {
CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1);
- combination.firstTaskWithFixedPartition(1);
+ combination.firstTaskWithFixedChunk(1);
verifyCombination(combination, new int[]{0,1,0}, 2);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{0,1,1}, 3);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{1,1,0}, 6);
- assertTrue(combination.nextTaskWithFixedPartition());
+ assertTrue(combination.nextTaskWithFixedChunk());
verifyCombination(combination, new int[]{1,1,1}, 7);
- assertFalse(combination.nextTaskWithFixedPartition());
+ assertFalse(combination.nextTaskWithFixedChunk());
}
@Test(timeout = 5000)
@@ -110,9 +110,9 @@ public class TestCartesianProductCombination {
@Test(timeout = 5000)
public void testRejectZero() {
- int[] numTasks = new int[] {0 ,1};
+ int[] numChunk = new int[] {0 ,1};
try {
- new CartesianProductCombination(numTasks);
+ new CartesianProductCombination(numChunk);
assertTrue(false);
} catch (Exception ignored) {}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
index c9e49a3..4857749 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java
@@ -122,15 +122,15 @@ public class TestCartesianProductConfig {
// auto grouping conf not set
CartesianProductConfigProto proto = config.toProto(conf);
assertFalse(proto.hasEnableAutoGrouping());
- assertFalse(proto.hasDesiredBytesPerGroup());
+ assertFalse(proto.hasDesiredBytesPerChunk());
// auto groupinig conf not set
conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true);
conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000);
proto = config.toProto(conf);
assertTrue(proto.hasEnableAutoGrouping());
- assertTrue(proto.hasDesiredBytesPerGroup());
+ assertTrue(proto.hasDesiredBytesPerChunk());
assertEquals(true, proto.getEnableAutoGrouping());
- assertEquals(1000, proto.getDesiredBytesPerGroup());
+ assertEquals(1000, proto.getDesiredBytesPerChunk());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
index 12aee3b..d722932 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java
@@ -40,7 +40,7 @@ public class TestCartesianProductEdgeManager {
// partitioned case
CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
builder.setIsPartitioned(true)
- .addAllSourceVertices(Arrays.asList("v0", "v1"))
+ .addAllSources(Arrays.asList("v0", "v1"))
.addAllNumPartitions(Ints.asList(2,3));
UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
when(context.getUserPayload()).thenReturn(payload);
@@ -51,8 +51,8 @@ public class TestCartesianProductEdgeManager {
// unpartitioned case
builder.clear();
builder.setIsPartitioned(false)
- .addAllSourceVertices(Arrays.asList("v0", "v1"))
- .addAllNumTasks(Ints.asList(2,3));
+ .addAllSources(Arrays.asList("v0", "v1"))
+ .addAllNumChunks(Ints.asList(2,3));
payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
when(context.getUserPayload()).thenReturn(payload);
when(context.getSourceVertexNumTasks()).thenReturn(2);
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
index 9f6fa09..3ba6aad 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java
@@ -28,23 +28,26 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
public class TestCartesianProductEdgeManagerConfig {
@Test(timeout = 5000)
- public void testAutoGroupingConfig() throws IOException {
+ public void testUnpartitionedAutoGroupingConfig() throws IOException {
List<String> sourceVertices = new ArrayList<>();
sourceVertices.add("v0");
sourceVertices.add("v1");
- int[] numTasks = new int[] {4, 5};
- int[] numGroups = new int[] {2, 3};
+ int[] numChunkPerSrc = new int[] {2, 3};
+ int numGroup = 3, chunkIdOffset = 0;
CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
- builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks))
- .addAllSourceVertices(sourceVertices).addAllNumGroups(Ints.asList(numGroups));
+ builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc))
+ .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset);
UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()));
CartesianProductEdgeManagerConfig config =
CartesianProductEdgeManagerConfig.fromUserPayload(payload);
- assertArrayEquals(numGroups, config.getNumGroups());
+ assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc);
+ assertEquals(numGroup, config.numChunk);
+ assertEquals(chunkIdOffset, config.chunkIdOffset);
}
}