You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/10/29 01:41:42 UTC

svn commit: r1635016 - /hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java

Author: vikram
Date: Wed Oct 29 00:41:41 2014
New Revision: 1635016

URL: http://svn.apache.org/r1635016
Log:
HIVE-8597: SMB join small table side should use the same set of serialized payloads across tasks (Siddharth Seth via Vikram Dixit)

Modified:
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1635016&r1=1635015&r2=1635016&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Oct 29 00:41:41 2014
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -109,7 +110,6 @@ public class CustomPartitionVertex exten
   VertexManagerPluginContext context;
 
   private InputConfigureVertexTasksEvent configureVertexTaskEvent;
-  private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
   private final SplitGrouper grouper = new SplitGrouper();
@@ -223,8 +223,6 @@ public class CustomPartitionVertex exten
         configureVertexTaskEvent = cEvent;
         LOG.info("Configure task for input name: " + inputName + " num tasks: "
             + configureVertexTaskEvent.getNumTasks());
-        dataInformationEvents =
-            Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
       }
       if (event instanceof InputUpdatePayloadEvent) {
         // this event can never occur. If it does, fail.
@@ -232,7 +230,6 @@ public class CustomPartitionVertex exten
       } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
         InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
-        dataInformationEvents.add(diEvent);
         FileSplit fileSplit;
         try {
           fileSplit = getFileSplitFromEvent(diEvent);
@@ -341,16 +338,26 @@ public class CustomPartitionVertex exten
         + " multi mr inputs. " + bucketToTaskMap);
 
     Integer[] numSplitsForTask = new Integer[taskCount];
+
+    Multimap<Integer, ByteBuffer> bucketToSerializedSplitMap = LinkedListMultimap.create();
+
+    // Create the list of serialized splits for each bucket.
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+      for (InputSplit split : entry.getValue()) {
+        MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+        ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer();
+        bucketToSerializedSplitMap.put(entry.getKey(), bs);
+      }
+    }
+
+    for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
       Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
       for (Integer task : destTasks) {
         int count = 0;
-        for (InputSplit split : entry.getValue()) {
+        for (ByteBuffer buf : entry.getValue()) {
           count++;
-          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
           InputDataInformationEvent diEvent =
-              InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
-                  .toByteString().asReadOnlyByteBuffer());
+              InputDataInformationEvent.createWithSerializedPayload(count, buf);
           diEvent.setTargetIndex(task);
           taskEvents.add(diEvent);
         }
@@ -472,6 +479,7 @@ public class CustomPartitionVertex exten
     context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
         .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap,
         rootInputSpecUpdate);
+    finalSplits.clear();
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {