You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/10/29 01:41:42 UTC
svn commit: r1635016 -
/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
Author: vikram
Date: Wed Oct 29 00:41:41 2014
New Revision: 1635016
URL: http://svn.apache.org/r1635016
Log:
HIVE-8597: SMB join small table side should use the same set of serialized payloads across tasks (Siddharth Seth via Vikram Dixit)
Modified:
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1635016&r1=1635015&r2=1635016&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Oct 29 00:41:41 2014
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -109,7 +110,6 @@ public class CustomPartitionVertex exten
VertexManagerPluginContext context;
private InputConfigureVertexTasksEvent configureVertexTaskEvent;
- private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
private final SplitGrouper grouper = new SplitGrouper();
@@ -223,8 +223,6 @@ public class CustomPartitionVertex exten
configureVertexTaskEvent = cEvent;
LOG.info("Configure task for input name: " + inputName + " num tasks: "
+ configureVertexTaskEvent.getNumTasks());
- dataInformationEvents =
- Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
}
if (event instanceof InputUpdatePayloadEvent) {
// this event can never occur. If it does, fail.
@@ -232,7 +230,6 @@ public class CustomPartitionVertex exten
} else if (event instanceof InputDataInformationEvent) {
dataInformationEventSeen = true;
InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
- dataInformationEvents.add(diEvent);
FileSplit fileSplit;
try {
fileSplit = getFileSplitFromEvent(diEvent);
@@ -341,16 +338,26 @@ public class CustomPartitionVertex exten
+ " multi mr inputs. " + bucketToTaskMap);
Integer[] numSplitsForTask = new Integer[taskCount];
+
+ Multimap<Integer, ByteBuffer> bucketToSerializedSplitMap = LinkedListMultimap.create();
+
+ // Create the list of serialized splits for each bucket.
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+ for (InputSplit split : entry.getValue()) {
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+ ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer();
+ bucketToSerializedSplitMap.put(entry.getKey(), bs);
+ }
+ }
+
+ for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
for (Integer task : destTasks) {
int count = 0;
- for (InputSplit split : entry.getValue()) {
+ for (ByteBuffer buf : entry.getValue()) {
count++;
- MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
InputDataInformationEvent diEvent =
- InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
- .toByteString().asReadOnlyByteBuffer());
+ InputDataInformationEvent.createWithSerializedPayload(count, buf);
diEvent.setTargetIndex(task);
taskEvents.add(diEvent);
}
@@ -472,6 +479,7 @@ public class CustomPartitionVertex exten
context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
.createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap,
rootInputSpecUpdate);
+ finalSplits.clear();
}
UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {