You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/18 02:22:52 UTC
svn commit: r1632709 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
Author: gunther
Date: Sat Oct 18 00:22:51 2014
New Revision: 1632709
URL: http://svn.apache.org/r1632709
Log:
HIVE-8489: Add sanity check to dynamic partition pruning (Gunther Hagleitner, reviewed by Vikram Dixit K)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1632709&r1=1632708&r2=1632709&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Sat Oct 18 00:22:51 2014
@@ -87,6 +87,8 @@ public class DynamicPartitionPruner {
private final Object endOfEvents = new Object();
+ private int totalEventCount = 0;
+
public DynamicPartitionPruner() {
}
@@ -114,7 +116,7 @@ public class DynamicPartitionPruner {
// synchronous event processing loop. Won't return until all events have
// been processed.
this.processEvents();
- this.prunePartitions(work);
+ this.prunePartitions(work, context);
LOG.info("Ok to proceed.");
}
@@ -163,12 +165,22 @@ public class DynamicPartitionPruner {
}
}
- private void prunePartitions(MapWork work) throws HiveException {
+ private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException {
+ int expectedEvents = 0;
for (String source : this.sourceInfoMap.keySet()) {
for (SourceInfo si : this.sourceInfoMap.get(source)) {
+ int taskNum = context.getVertexNumTasks(source);
+ LOG.info("Expecting " + taskNum + " events for vertex " + source);
+ expectedEvents += taskNum;
prunePartitionSingleSource(source, si, work);
}
}
+
+ // sanity check. all tasks must submit events for us to succeed.
+ if (expectedEvents != totalEventCount) {
+ LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+ throw new HiveException("Incorrect event count in dynamic parition pruning");
+ }
}
private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work)
@@ -396,7 +408,8 @@ public class DynamicPartitionPruner {
public void addEvent(InputInitializerEvent event) {
synchronized(sourcesWaitingForEvents) {
if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
- queue.offer(event);
+ ++totalEventCount;
+ queue.offer(event);
}
}
}