You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/18 02:23:25 UTC

svn commit: r1632710 - /hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java

Author: gunther
Date: Sat Oct 18 00:23:25 2014
New Revision: 1632710

URL: http://svn.apache.org/r1632710
Log:
HIVE-8489: Add sanity check to dynamic partition pruning (Gunther Hagleitner, reviewed by Vikram Dixit K)

Modified:
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1632710&r1=1632709&r2=1632710&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Sat Oct 18 00:23:25 2014
@@ -87,6 +87,8 @@ public class DynamicPartitionPruner {
 
   private final Object endOfEvents = new Object();
 
+  private int totalEventCount = 0;
+
   public DynamicPartitionPruner() {
   }
 
@@ -114,7 +116,7 @@ public class DynamicPartitionPruner {
     // synchronous event processing loop. Won't return until all events have
     // been processed.
     this.processEvents();
-    this.prunePartitions(work);
+    this.prunePartitions(work, context);
     LOG.info("Ok to proceed.");
   }
 
@@ -163,12 +165,22 @@ public class DynamicPartitionPruner {
     }
   }
 
-  private void prunePartitions(MapWork work) throws HiveException {
+  private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException {
+    int expectedEvents = 0;
     for (String source : this.sourceInfoMap.keySet()) {
       for (SourceInfo si : this.sourceInfoMap.get(source)) {
+        int taskNum = context.getVertexNumTasks(source);
+        LOG.info("Expecting " + taskNum + " events for vertex " + source);
+        expectedEvents += taskNum;
         prunePartitionSingleSource(source, si, work);
       }
     }
+
+    // sanity check. all tasks must submit events for us to succeed.
+    if (expectedEvents != totalEventCount) {
+      LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+      throw new HiveException("Incorrect event count in dynamic parition pruning");
+    }
   }
 
   private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work)
@@ -396,7 +408,8 @@ public class DynamicPartitionPruner {
   public void addEvent(InputInitializerEvent event) {
     synchronized(sourcesWaitingForEvents) {
       if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
-          queue.offer(event);
+        ++totalEventCount;
+        queue.offer(event);
       }
     }
   }