You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mu...@apache.org on 2021/01/07 06:59:42 UTC

[hive] branch master updated: HIVE-24415: HiveSplitGenerator blocks Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)

This is an automated email from the ASF dual-hosted git repository.

mustafaiman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fa39cf2  HIVE-24415: HiveSplitGenerator blocks Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)
fa39cf2 is described below

commit fa39cf27ac9e0e05f230c68490f66724179fb37a
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Mon Nov 23 21:17:43 2020 -0800

    HIVE-24415: HiveSplitGenerator blocks Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)
---
 .../hive/ql/exec/tez/DynamicPartitionPruner.java   | 153 +++++++++------------
 .../hive/ql/exec/tez/HiveSplitGenerator.java       |  36 ++---
 .../ql/exec/tez/TestDynamicPartitionPruner.java    | 121 ++++++++++------
 3 files changed, 167 insertions(+), 143 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
index 6fc7096..88e3754 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
@@ -78,18 +78,17 @@ public class DynamicPartitionPruner {
 
   private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionPruner.class);
 
-  private final InputInitializerContext context;
-  private final MapWork work;
-  private final JobConf jobConf;
+  private InputInitializerContext context;
+  private MapWork work;
 
-
-  private final Map<String, List<SourceInfo>> sourceInfoMap =
-      new HashMap<String, List<SourceInfo>>();
+  private final Map<String, List<SourceInfo>> sourceInfoMap = new HashMap<>();
 
   private final BytesWritable writable = new BytesWritable();
 
   /* Keeps track of all events that need to be processed - irrespective of the source */
-  private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+  private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
+  private final BlockingQueue<String> finishedVertices = new LinkedBlockingQueue<>();
+  private static final Object VERTEX_FINISH_TOKEN = new Object();
 
   /* Keeps track of vertices from which events are expected */
   private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
@@ -100,37 +99,19 @@ public class DynamicPartitionPruner {
 
   private int sourceInfoCount = 0;
 
-  private final Object endOfEvents = new Object();
-
   private int totalEventCount = 0;
 
-  public DynamicPartitionPruner(InputInitializerContext context, MapWork work, JobConf jobConf) throws
-      SerDeException {
-    this.context = context;
-    this.work = work;
-    this.jobConf = jobConf;
-    synchronized (this) {
-      initialize();
+  public void prune() throws SerDeException, IOException, InterruptedException, HiveException {
+    if (sourcesWaitingForEvents.isEmpty()) {
+      return;
     }
-  }
-
-  public void prune()
-      throws SerDeException, IOException,
-      InterruptedException, HiveException {
 
-    synchronized(sourcesWaitingForEvents) {
-
-      if (sourcesWaitingForEvents.isEmpty()) {
-        return;
-      }
-
-      Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
-      for (String source : sourcesWaitingForEvents) {
-        // we need to get state transition updates for the vertices that will send
-        // events to us. once we have received all events and a vertex has succeeded,
-        // we can move to do the pruning.
-        context.registerForVertexStateUpdates(source, states);
-      }
+    Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+    for (String source : sourcesWaitingForEvents) {
+      // we need to get state transition updates for the vertices that will send
+      // events to us. once we have received all events and a vertex has succeeded,
+      // we can move to do the pruning.
+      context.registerForVertexStateUpdates(source, states);
     }
 
     LOG.info("Waiting for events (" + sourceInfoCount + " sources) ...");
@@ -141,17 +122,15 @@ public class DynamicPartitionPruner {
     LOG.info("Ok to proceed.");
   }
 
-  public BlockingQueue<Object> getQueue() {
-    return queue;
-  }
-
   private void clear() {
     sourceInfoMap.clear();
     sourceInfoCount = 0;
   }
 
-  private void initialize() throws SerDeException {
+  public void initialize(InputInitializerContext context, MapWork work, JobConf jobConf) throws SerDeException {
     this.clear();
+    this.context = context;
+    this.work = work;
     Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
     // sources represent vertex names
     Set<String> sources = work.getEventSourceTableDescMap().keySet();
@@ -381,17 +360,26 @@ public class DynamicPartitionPruner {
     while (true) {
       Object element = queue.take();
 
-      if (element == endOfEvents) {
-        // we're done processing events
-        break;
+      if (element == VERTEX_FINISH_TOKEN) {
+        String updatedSource = finishedVertices.poll();
+        calculateFinishCondition(updatedSource);
+        if (checkForSourceCompletion(updatedSource)) {
+          break;
+        }
+      } else {
+        InputInitializerEvent event = (InputInitializerEvent) element;
+        numEventsSeenPerSource.computeIfAbsent(event.getSourceVertexName(), vn -> new MutableInt(0))
+            .increment();
+
+        totalEventCount++;
+        LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
+            + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
+        processPayload(event.getUserPayload(), event.getSourceVertexName());
+        eventCount += 1;
+        if (checkForSourceCompletion(event.getSourceVertexName())) {
+          break;
+        }
       }
-
-      InputInitializerEvent event = (InputInitializerEvent) element;
-
-      LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
-          + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
-      processPayload(event.getUserPayload(), event.getSourceVertexName());
-      eventCount += 1;
     }
     LOG.info("Received events: " + eventCount);
   }
@@ -483,54 +471,47 @@ public class DynamicPartitionPruner {
   }
 
   public void addEvent(InputInitializerEvent event) {
-    synchronized(sourcesWaitingForEvents) {
-      if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
-        ++totalEventCount;
-        numEventsSeenPerSource.get(event.getSourceVertexName()).increment();
-        if(!queue.offer(event)) {
-          throw new IllegalStateException("Queue full");
-        }
-        checkForSourceCompletion(event.getSourceVertexName());
-      }
+    if(!queue.offer(event)) {
+      throw new IllegalStateException("Queue full");
     }
   }
 
+  private void calculateFinishCondition(String sourceName) {
+    // Get a deterministic count of number of tasks for the vertex.
+    MutableInt prevVal = numExpectedEventsPerSource.get(sourceName);
+    int prevValInt = prevVal.intValue();
+    Preconditions.checkState(prevValInt < 0,
+        "Invalid value for numExpectedEvents for source: " + sourceName + ", oldVal=" + prevValInt);
+    prevVal.setValue((-1) * prevValInt * context.getVertexNumTasks(sourceName));
+  }
+
   public void processVertex(String name) {
     LOG.info("Vertex succeeded: " + name);
-    synchronized(sourcesWaitingForEvents) {
-      // Get a deterministic count of number of tasks for the vertex.
-      MutableInt prevVal = numExpectedEventsPerSource.get(name);
-      int prevValInt = prevVal.intValue();
-      Preconditions.checkState(prevValInt < 0,
-          "Invalid value for numExpectedEvents for source: " + name + ", oldVal=" + prevValInt);
-      prevVal.setValue((-1) * prevValInt * context.getVertexNumTasks(name));
-      checkForSourceCompletion(name);
-    }
+    finishedVertices.add(name);
+    queue.offer(VERTEX_FINISH_TOKEN);
   }
 
-  private void checkForSourceCompletion(String name) {
+  private boolean checkForSourceCompletion(String name) {
     int expectedEvents = numExpectedEventsPerSource.get(name).getValue();
     if (expectedEvents < 0) {
       // Expected events not updated yet - vertex SUCCESS notification not received.
-      return;
-    } else {
-      int processedEvents = numEventsSeenPerSource.get(name).getValue();
-      if (processedEvents == expectedEvents) {
-        sourcesWaitingForEvents.remove(name);
-        if (sourcesWaitingForEvents.isEmpty()) {
-          // we've got what we need; mark the queue
-          if(!queue.offer(endOfEvents)) {
-            throw new IllegalStateException("Queue full");
-          }
-        } else {
-          LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
-        }
-      } else if (processedEvents > expectedEvents) {
-        throw new IllegalStateException(
-            "Received too many events for " + name + ", Expected=" + expectedEvents +
-                ", Received=" + processedEvents);
+      return false;
+    }
+
+    int processedEvents = numEventsSeenPerSource.get(name).getValue();
+    if (processedEvents == expectedEvents) {
+      sourcesWaitingForEvents.remove(name);
+      if (sourcesWaitingForEvents.isEmpty()) {
+        return true;
+      } else {
+        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
+        return false;
       }
-      return;
+    } else if (processedEvents > expectedEvents) {
+      throw new IllegalStateException(
+          "Received too many events for " + name + ", Expected=" + expectedEvents +
+              ", Received=" + processedEvents);
     }
+    return false;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index f6d0a89..8d682e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -86,13 +86,13 @@ public class HiveSplitGenerator extends InputInitializer {
   private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class);
 
   private final DynamicPartitionPruner pruner;
-  private final Configuration conf;
-  private final JobConf jobConf;
-  private final MRInputUserPayloadProto userPayloadProto;
-  private final MapWork work;
+  private Configuration conf;
+  private JobConf jobConf;
+  private MRInputUserPayloadProto userPayloadProto;
+  private MapWork work;
   private final SplitGrouper splitGrouper = new SplitGrouper();
-  private final SplitLocationProvider splitLocationProvider;
-  private final Optional<Integer> numSplits;
+  private SplitLocationProvider splitLocationProvider;
+  private Optional<Integer> numSplits;
 
   private boolean generateSingleSplit;
 
@@ -128,11 +128,14 @@ public class HiveSplitGenerator extends InputInitializer {
     this.numSplits = Optional.ofNullable(numSplits);
   }
 
-  public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
-      SerDeException {
+  public HiveSplitGenerator(InputInitializerContext initializerContext) {
     super(initializerContext);
-
     Preconditions.checkNotNull(initializerContext);
+    pruner = new DynamicPartitionPruner();
+    this.numSplits = Optional.empty();
+  }
+
+  private void prepare(InputInitializerContext initializerContext) throws IOException, SerDeException {
     userPayloadProto =
         MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
 
@@ -148,18 +151,16 @@ public class HiveSplitGenerator extends InputInitializer {
     this.splitLocationProvider =
         Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG);
     LOG.info("SplitLocationProvider: " + splitLocationProvider);
-
-    // Events can start coming in the moment the InputInitializer is created. The pruner
-    // must be setup and initialized here so that it sets up it's structures to start accepting events.
-    // Setting it up in initialize leads to a window where events may come in before the pruner is
-    // initialized, which may cause it to drop events.
-    pruner = new DynamicPartitionPruner(initializerContext, work, jobConf);
-    this.numSplits = Optional.empty();
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public List<Event> initialize() throws Exception {
+    if (getContext() != null) {
+      // called from Tez AM.
+      prepare(getContext());
+    }
+
     // Setup the map work for this thread. Pruning modified the work instance to potentially remove
     // partitions. The same work instance must be used when generating splits.
     Utilities.setMapWork(jobConf, work);
@@ -169,6 +170,7 @@ public class HiveSplitGenerator extends InputInitializer {
 
       // perform dynamic partition pruning
       if (pruner != null) {
+        pruner.initialize(getContext(), work, jobConf);
         pruner.prune();
       }
 
@@ -385,6 +387,8 @@ public class HiveSplitGenerator extends InputInitializer {
 
   @Override
   public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    // pruner registers for vertex state updates after it is ready to handle them
+    // so we do not worry about events coming before pruner was initialized
     pruner.processVertex(stateUpdate.getVertexName());
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
index d38691e..7ced5a2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
@@ -14,7 +14,9 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -25,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -45,8 +48,9 @@ public class TestDynamicPartitionPruner {
       SerDeException {
     InputInitializerContext mockInitContext = mock(InputInitializerContext.class);
     MapWork mapWork = mock(MapWork.class);
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -55,6 +59,8 @@ public class TestDynamicPartitionPruner {
       pruneRunnable.start();
       pruneRunnable.awaitEnd();
       // Return immediately. No entries found for pruning. Verified via the timeout.
+      assertEquals(0, pruner.eventsProceessed.intValue());
+      assertEquals(0, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -68,8 +74,9 @@ public class TestDynamicPartitionPruner {
     doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -86,7 +93,9 @@ public class TestDynamicPartitionPruner {
       pruner.processVertex("v1");
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(1, pruner.eventsProceessed.intValue());
+      assertEquals(1, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -100,8 +109,9 @@ public class TestDynamicPartitionPruner {
     doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -118,7 +128,9 @@ public class TestDynamicPartitionPruner {
       pruner.addEvent(event);
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(1, pruner.eventsProceessed.intValue());
+      assertEquals(1, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -131,8 +143,9 @@ public class TestDynamicPartitionPruner {
     doReturn(2).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 2));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -151,7 +164,9 @@ public class TestDynamicPartitionPruner {
       pruner.processVertex("v1");
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(4, pruner.eventsProceessed.intValue());
+      assertEquals(2, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -164,8 +179,9 @@ public class TestDynamicPartitionPruner {
     doReturn(2).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 2));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -184,7 +200,9 @@ public class TestDynamicPartitionPruner {
       pruner.addEvent(event);
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(4, pruner.eventsProceessed.intValue());
+      assertEquals(2, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -198,8 +216,9 @@ public class TestDynamicPartitionPruner {
     doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -228,7 +247,9 @@ public class TestDynamicPartitionPruner {
       pruner.processVertex("v2");
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(7, pruner.eventsProceessed.intValue());
+      assertEquals(3, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -242,8 +263,9 @@ public class TestDynamicPartitionPruner {
     doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -272,7 +294,9 @@ public class TestDynamicPartitionPruner {
       pruner.addEvent(eventV2);
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(7, pruner.eventsProceessed.intValue());
+      assertEquals(3, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -286,8 +310,9 @@ public class TestDynamicPartitionPruner {
     doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
     Thread t = new Thread(pruneRunnable);
@@ -315,22 +340,25 @@ public class TestDynamicPartitionPruner {
       pruner.addEvent(eventV2);
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(7, pruner.eventsProceessed.intValue());
+      assertEquals(3, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
     }
   }
 
-  @Test(timeout = 20000, expected = IllegalStateException.class)
+  @Test(timeout = 20000)
   public void testExtraEvents() throws InterruptedException, IOException, HiveException,
       SerDeException {
     InputInitializerContext mockInitContext = mock(InputInitializerContext.class);
     doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -348,7 +376,10 @@ public class TestDynamicPartitionPruner {
       pruner.processVertex("v1");
 
       pruneRunnable.awaitEnd();
-      assertFalse(pruneRunnable.inError.get());
+      assertTrue(pruneRunnable.inError.get());
+      assertTrue(pruneRunnable.exception instanceof IllegalStateException);
+      assertEquals(2, pruner.eventsProceessed.intValue());
+      assertEquals(0, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
@@ -362,8 +393,9 @@ public class TestDynamicPartitionPruner {
     doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
 
     MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
-    DynamicPartitionPruner pruner =
-        new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+    DynamicPartitionPrunerForEventTesting pruner =
+        new DynamicPartitionPrunerForEventTesting();
+    pruner.initialize(mockInitContext, mapWork, new JobConf());
 
 
     PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -380,13 +412,21 @@ public class TestDynamicPartitionPruner {
       Thread.sleep(3000l);
       // The pruner should not have completed.
       assertFalse(pruneRunnable.ended.get());
-      assertFalse(pruneRunnable.inError.get());
+      assertNoError(pruneRunnable);
+      assertEquals(0, pruner.eventsProceessed.intValue());
+      assertEquals(0, pruner.filteredSources.intValue());
     } finally {
       t.interrupt();
       t.join();
     }
   }
 
+  private void assertNoError(PruneRunnable pruneRunnable) {
+    if (pruneRunnable.inError.get()) {
+      throw new AssertionError(pruneRunnable.exception);
+    }
+  }
+
   private static class PruneRunnable implements Runnable {
 
     final DynamicPartitionPruner pruner;
@@ -396,6 +436,7 @@ public class TestDynamicPartitionPruner {
     final AtomicBoolean started = new AtomicBoolean(false);
     final AtomicBoolean ended = new AtomicBoolean(false);
     final AtomicBoolean inError = new AtomicBoolean(false);
+    Exception exception;
 
     private PruneRunnable(DynamicPartitionPruner pruner) {
       this.pruner = pruner;
@@ -433,17 +474,18 @@ public class TestDynamicPartitionPruner {
         } finally {
           lock.unlock();
         }
-
         pruner.prune();
-        lock.lock();
+      } catch (Exception e) {
+        inError.set(true);
+        exception = e;
+      } finally {
         try {
+          lock.lock();
           ended.set(true);
           endCondition.signal();
         } finally {
           lock.unlock();
         }
-      } catch (SerDeException | IOException | InterruptedException | HiveException e) {
-        inError.set(true);
       }
     }
   }
@@ -512,11 +554,8 @@ public class TestDynamicPartitionPruner {
 
   private static class DynamicPartitionPrunerForEventTesting extends DynamicPartitionPruner {
 
-
-    public DynamicPartitionPrunerForEventTesting(
-        InputInitializerContext context, MapWork work) throws SerDeException {
-      super(context, work, new JobConf());
-    }
+    LongAdder filteredSources = new LongAdder();
+    LongAdder eventsProceessed = new LongAdder();
 
     @Override
     protected SourceInfo createSourceInfo(TableDesc t, ExprNodeDesc partKeyExpr, String columnName, String columnType,
@@ -528,14 +567,14 @@ public class TestDynamicPartitionPruner {
     @Override
     protected String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
         IOException {
-      // No-op: testing events only
+      eventsProceessed.increment();
       return sourceName;
     }
 
     @Override
     protected void prunePartitionSingleSource(String source, SourceInfo si)
         throws HiveException {
-      // No-op: testing events only
+      filteredSources.increment();
     }
   }
 }