You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mu...@apache.org on 2021/01/07 06:59:42 UTC
[hive] branch master updated: HIVE-24415: HiveSplitGenerator blocks
Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)
This is an automated email from the ASF dual-hosted git repository.
mustafaiman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fa39cf2 HIVE-24415: HiveSplitGenerator blocks Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)
fa39cf2 is described below
commit fa39cf27ac9e0e05f230c68490f66724179fb37a
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Mon Nov 23 21:17:43 2020 -0800
HIVE-24415: HiveSplitGenerator blocks Tez dispatcher (Mustafa Iman, reviewed by Ramesh Kumar Thangarajan)
---
.../hive/ql/exec/tez/DynamicPartitionPruner.java | 153 +++++++++------------
.../hive/ql/exec/tez/HiveSplitGenerator.java | 36 ++---
.../ql/exec/tez/TestDynamicPartitionPruner.java | 121 ++++++++++------
3 files changed, 167 insertions(+), 143 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
index 6fc7096..88e3754 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
@@ -78,18 +78,17 @@ public class DynamicPartitionPruner {
private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionPruner.class);
- private final InputInitializerContext context;
- private final MapWork work;
- private final JobConf jobConf;
+ private InputInitializerContext context;
+ private MapWork work;
-
- private final Map<String, List<SourceInfo>> sourceInfoMap =
- new HashMap<String, List<SourceInfo>>();
+ private final Map<String, List<SourceInfo>> sourceInfoMap = new HashMap<>();
private final BytesWritable writable = new BytesWritable();
/* Keeps track of all events that need to be processed - irrespective of the source */
- private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+ private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<String> finishedVertices = new LinkedBlockingQueue<>();
+ private static final Object VERTEX_FINISH_TOKEN = new Object();
/* Keeps track of vertices from which events are expected */
private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
@@ -100,37 +99,19 @@ public class DynamicPartitionPruner {
private int sourceInfoCount = 0;
- private final Object endOfEvents = new Object();
-
private int totalEventCount = 0;
- public DynamicPartitionPruner(InputInitializerContext context, MapWork work, JobConf jobConf) throws
- SerDeException {
- this.context = context;
- this.work = work;
- this.jobConf = jobConf;
- synchronized (this) {
- initialize();
+ public void prune() throws SerDeException, IOException, InterruptedException, HiveException {
+ if (sourcesWaitingForEvents.isEmpty()) {
+ return;
}
- }
-
- public void prune()
- throws SerDeException, IOException,
- InterruptedException, HiveException {
- synchronized(sourcesWaitingForEvents) {
-
- if (sourcesWaitingForEvents.isEmpty()) {
- return;
- }
-
- Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
- for (String source : sourcesWaitingForEvents) {
- // we need to get state transition updates for the vertices that will send
- // events to us. once we have received all events and a vertex has succeeded,
- // we can move to do the pruning.
- context.registerForVertexStateUpdates(source, states);
- }
+ Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+ for (String source : sourcesWaitingForEvents) {
+ // we need to get state transition updates for the vertices that will send
+ // events to us. once we have received all events and a vertex has succeeded,
+ // we can move to do the pruning.
+ context.registerForVertexStateUpdates(source, states);
}
LOG.info("Waiting for events (" + sourceInfoCount + " sources) ...");
@@ -141,17 +122,15 @@ public class DynamicPartitionPruner {
LOG.info("Ok to proceed.");
}
- public BlockingQueue<Object> getQueue() {
- return queue;
- }
-
private void clear() {
sourceInfoMap.clear();
sourceInfoCount = 0;
}
- private void initialize() throws SerDeException {
+ public void initialize(InputInitializerContext context, MapWork work, JobConf jobConf) throws SerDeException {
this.clear();
+ this.context = context;
+ this.work = work;
Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
// sources represent vertex names
Set<String> sources = work.getEventSourceTableDescMap().keySet();
@@ -381,17 +360,26 @@ public class DynamicPartitionPruner {
while (true) {
Object element = queue.take();
- if (element == endOfEvents) {
- // we're done processing events
- break;
+ if (element == VERTEX_FINISH_TOKEN) {
+ String updatedSource = finishedVertices.poll();
+ calculateFinishCondition(updatedSource);
+ if (checkForSourceCompletion(updatedSource)) {
+ break;
+ }
+ } else {
+ InputInitializerEvent event = (InputInitializerEvent) element;
+ numEventsSeenPerSource.computeIfAbsent(event.getSourceVertexName(), vn -> new MutableInt(0))
+ .increment();
+
+ totalEventCount++;
+ LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
+ + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
+ processPayload(event.getUserPayload(), event.getSourceVertexName());
+ eventCount += 1;
+ if (checkForSourceCompletion(event.getSourceVertexName())) {
+ break;
+ }
}
-
- InputInitializerEvent event = (InputInitializerEvent) element;
-
- LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
- + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
- processPayload(event.getUserPayload(), event.getSourceVertexName());
- eventCount += 1;
}
LOG.info("Received events: " + eventCount);
}
@@ -483,54 +471,47 @@ public class DynamicPartitionPruner {
}
public void addEvent(InputInitializerEvent event) {
- synchronized(sourcesWaitingForEvents) {
- if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
- ++totalEventCount;
- numEventsSeenPerSource.get(event.getSourceVertexName()).increment();
- if(!queue.offer(event)) {
- throw new IllegalStateException("Queue full");
- }
- checkForSourceCompletion(event.getSourceVertexName());
- }
+ if(!queue.offer(event)) {
+ throw new IllegalStateException("Queue full");
}
}
+ private void calculateFinishCondition(String sourceName) {
+ // Get a deterministic count of number of tasks for the vertex.
+ MutableInt prevVal = numExpectedEventsPerSource.get(sourceName);
+ int prevValInt = prevVal.intValue();
+ Preconditions.checkState(prevValInt < 0,
+ "Invalid value for numExpectedEvents for source: " + sourceName + ", oldVal=" + prevValInt);
+ prevVal.setValue((-1) * prevValInt * context.getVertexNumTasks(sourceName));
+ }
+
public void processVertex(String name) {
LOG.info("Vertex succeeded: " + name);
- synchronized(sourcesWaitingForEvents) {
- // Get a deterministic count of number of tasks for the vertex.
- MutableInt prevVal = numExpectedEventsPerSource.get(name);
- int prevValInt = prevVal.intValue();
- Preconditions.checkState(prevValInt < 0,
- "Invalid value for numExpectedEvents for source: " + name + ", oldVal=" + prevValInt);
- prevVal.setValue((-1) * prevValInt * context.getVertexNumTasks(name));
- checkForSourceCompletion(name);
- }
+ finishedVertices.add(name);
+ queue.offer(VERTEX_FINISH_TOKEN);
}
- private void checkForSourceCompletion(String name) {
+ private boolean checkForSourceCompletion(String name) {
int expectedEvents = numExpectedEventsPerSource.get(name).getValue();
if (expectedEvents < 0) {
// Expected events not updated yet - vertex SUCCESS notification not received.
- return;
- } else {
- int processedEvents = numEventsSeenPerSource.get(name).getValue();
- if (processedEvents == expectedEvents) {
- sourcesWaitingForEvents.remove(name);
- if (sourcesWaitingForEvents.isEmpty()) {
- // we've got what we need; mark the queue
- if(!queue.offer(endOfEvents)) {
- throw new IllegalStateException("Queue full");
- }
- } else {
- LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
- }
- } else if (processedEvents > expectedEvents) {
- throw new IllegalStateException(
- "Received too many events for " + name + ", Expected=" + expectedEvents +
- ", Received=" + processedEvents);
+ return false;
+ }
+
+ int processedEvents = numEventsSeenPerSource.get(name).getValue();
+ if (processedEvents == expectedEvents) {
+ sourcesWaitingForEvents.remove(name);
+ if (sourcesWaitingForEvents.isEmpty()) {
+ return true;
+ } else {
+ LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " sources.");
+ return false;
}
- return;
+ } else if (processedEvents > expectedEvents) {
+ throw new IllegalStateException(
+ "Received too many events for " + name + ", Expected=" + expectedEvents +
+ ", Received=" + processedEvents);
}
+ return false;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index f6d0a89..8d682e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -86,13 +86,13 @@ public class HiveSplitGenerator extends InputInitializer {
private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class);
private final DynamicPartitionPruner pruner;
- private final Configuration conf;
- private final JobConf jobConf;
- private final MRInputUserPayloadProto userPayloadProto;
- private final MapWork work;
+ private Configuration conf;
+ private JobConf jobConf;
+ private MRInputUserPayloadProto userPayloadProto;
+ private MapWork work;
private final SplitGrouper splitGrouper = new SplitGrouper();
- private final SplitLocationProvider splitLocationProvider;
- private final Optional<Integer> numSplits;
+ private SplitLocationProvider splitLocationProvider;
+ private Optional<Integer> numSplits;
private boolean generateSingleSplit;
@@ -128,11 +128,14 @@ public class HiveSplitGenerator extends InputInitializer {
this.numSplits = Optional.ofNullable(numSplits);
}
- public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
- SerDeException {
+ public HiveSplitGenerator(InputInitializerContext initializerContext) {
super(initializerContext);
-
Preconditions.checkNotNull(initializerContext);
+ pruner = new DynamicPartitionPruner();
+ this.numSplits = Optional.empty();
+ }
+
+ private void prepare(InputInitializerContext initializerContext) throws IOException, SerDeException {
userPayloadProto =
MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
@@ -148,18 +151,16 @@ public class HiveSplitGenerator extends InputInitializer {
this.splitLocationProvider =
Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG);
LOG.info("SplitLocationProvider: " + splitLocationProvider);
-
- // Events can start coming in the moment the InputInitializer is created. The pruner
- // must be setup and initialized here so that it sets up it's structures to start accepting events.
- // Setting it up in initialize leads to a window where events may come in before the pruner is
- // initialized, which may cause it to drop events.
- pruner = new DynamicPartitionPruner(initializerContext, work, jobConf);
- this.numSplits = Optional.empty();
}
@SuppressWarnings("unchecked")
@Override
public List<Event> initialize() throws Exception {
+ if (getContext() != null) {
+ // called from Tez AM.
+ prepare(getContext());
+ }
+
// Setup the map work for this thread. Pruning modified the work instance to potentially remove
// partitions. The same work instance must be used when generating splits.
Utilities.setMapWork(jobConf, work);
@@ -169,6 +170,7 @@ public class HiveSplitGenerator extends InputInitializer {
// perform dynamic partition pruning
if (pruner != null) {
+ pruner.initialize(getContext(), work, jobConf);
pruner.prune();
}
@@ -385,6 +387,8 @@ public class HiveSplitGenerator extends InputInitializer {
@Override
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ // pruner registers for vertex state updates after it is ready to handle them
+ // so we do not worry about events coming before pruner was initialized
pruner.processVertex(stateUpdate.getVertexName());
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
index d38691e..7ced5a2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestDynamicPartitionPruner.java
@@ -14,7 +14,9 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -25,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,8 +48,9 @@ public class TestDynamicPartitionPruner {
SerDeException {
InputInitializerContext mockInitContext = mock(InputInitializerContext.class);
MapWork mapWork = mock(MapWork.class);
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -55,6 +59,8 @@ public class TestDynamicPartitionPruner {
pruneRunnable.start();
pruneRunnable.awaitEnd();
// Return immediately. No entries found for pruning. Verified via the timeout.
+ assertEquals(0, pruner.eventsProceessed.intValue());
+ assertEquals(0, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -68,8 +74,9 @@ public class TestDynamicPartitionPruner {
doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -86,7 +93,9 @@ public class TestDynamicPartitionPruner {
pruner.processVertex("v1");
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(1, pruner.eventsProceessed.intValue());
+ assertEquals(1, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -100,8 +109,9 @@ public class TestDynamicPartitionPruner {
doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -118,7 +128,9 @@ public class TestDynamicPartitionPruner {
pruner.addEvent(event);
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(1, pruner.eventsProceessed.intValue());
+ assertEquals(1, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -131,8 +143,9 @@ public class TestDynamicPartitionPruner {
doReturn(2).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 2));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -151,7 +164,9 @@ public class TestDynamicPartitionPruner {
pruner.processVertex("v1");
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(4, pruner.eventsProceessed.intValue());
+ assertEquals(2, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -164,8 +179,9 @@ public class TestDynamicPartitionPruner {
doReturn(2).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 2));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -184,7 +200,9 @@ public class TestDynamicPartitionPruner {
pruner.addEvent(event);
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(4, pruner.eventsProceessed.intValue());
+ assertEquals(2, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -198,8 +216,9 @@ public class TestDynamicPartitionPruner {
doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -228,7 +247,9 @@ public class TestDynamicPartitionPruner {
pruner.processVertex("v2");
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(7, pruner.eventsProceessed.intValue());
+ assertEquals(3, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -242,8 +263,9 @@ public class TestDynamicPartitionPruner {
doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -272,7 +294,9 @@ public class TestDynamicPartitionPruner {
pruner.addEvent(eventV2);
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(7, pruner.eventsProceessed.intValue());
+ assertEquals(3, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -286,8 +310,9 @@ public class TestDynamicPartitionPruner {
doReturn(3).when(mockInitContext).getVertexNumTasks("v2");
MapWork mapWork = createMockMapWork(new TestSource("v1", 2), new TestSource("v2", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
Thread t = new Thread(pruneRunnable);
@@ -315,22 +340,25 @@ public class TestDynamicPartitionPruner {
pruner.addEvent(eventV2);
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(7, pruner.eventsProceessed.intValue());
+ assertEquals(3, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
}
}
- @Test(timeout = 20000, expected = IllegalStateException.class)
+ @Test(timeout = 20000)
public void testExtraEvents() throws InterruptedException, IOException, HiveException,
SerDeException {
InputInitializerContext mockInitContext = mock(InputInitializerContext.class);
doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -348,7 +376,10 @@ public class TestDynamicPartitionPruner {
pruner.processVertex("v1");
pruneRunnable.awaitEnd();
- assertFalse(pruneRunnable.inError.get());
+ assertTrue(pruneRunnable.inError.get());
+ assertTrue(pruneRunnable.exception instanceof IllegalStateException);
+ assertEquals(2, pruner.eventsProceessed.intValue());
+ assertEquals(0, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
@@ -362,8 +393,9 @@ public class TestDynamicPartitionPruner {
doReturn(1).when(mockInitContext).getVertexNumTasks("v1");
MapWork mapWork = createMockMapWork(new TestSource("v1", 1));
- DynamicPartitionPruner pruner =
- new DynamicPartitionPrunerForEventTesting(mockInitContext, mapWork);
+ DynamicPartitionPrunerForEventTesting pruner =
+ new DynamicPartitionPrunerForEventTesting();
+ pruner.initialize(mockInitContext, mapWork, new JobConf());
PruneRunnable pruneRunnable = new PruneRunnable(pruner);
@@ -380,13 +412,21 @@ public class TestDynamicPartitionPruner {
Thread.sleep(3000l);
// The pruner should not have completed.
assertFalse(pruneRunnable.ended.get());
- assertFalse(pruneRunnable.inError.get());
+ assertNoError(pruneRunnable);
+ assertEquals(0, pruner.eventsProceessed.intValue());
+ assertEquals(0, pruner.filteredSources.intValue());
} finally {
t.interrupt();
t.join();
}
}
+ private void assertNoError(PruneRunnable pruneRunnable) {
+ if (pruneRunnable.inError.get()) {
+ throw new AssertionError(pruneRunnable.exception);
+ }
+ }
+
private static class PruneRunnable implements Runnable {
final DynamicPartitionPruner pruner;
@@ -396,6 +436,7 @@ public class TestDynamicPartitionPruner {
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean ended = new AtomicBoolean(false);
final AtomicBoolean inError = new AtomicBoolean(false);
+ Exception exception;
private PruneRunnable(DynamicPartitionPruner pruner) {
this.pruner = pruner;
@@ -433,17 +474,18 @@ public class TestDynamicPartitionPruner {
} finally {
lock.unlock();
}
-
pruner.prune();
- lock.lock();
+ } catch (Exception e) {
+ inError.set(true);
+ exception = e;
+ } finally {
try {
+ lock.lock();
ended.set(true);
endCondition.signal();
} finally {
lock.unlock();
}
- } catch (SerDeException | IOException | InterruptedException | HiveException e) {
- inError.set(true);
}
}
}
@@ -512,11 +554,8 @@ public class TestDynamicPartitionPruner {
private static class DynamicPartitionPrunerForEventTesting extends DynamicPartitionPruner {
-
- public DynamicPartitionPrunerForEventTesting(
- InputInitializerContext context, MapWork work) throws SerDeException {
- super(context, work, new JobConf());
- }
+ LongAdder filteredSources = new LongAdder();
+ LongAdder eventsProceessed = new LongAdder();
@Override
protected SourceInfo createSourceInfo(TableDesc t, ExprNodeDesc partKeyExpr, String columnName, String columnType,
@@ -528,14 +567,14 @@ public class TestDynamicPartitionPruner {
@Override
protected String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
IOException {
- // No-op: testing events only
+ eventsProceessed.increment();
return sourceName;
}
@Override
protected void prunePartitionSingleSource(String source, SourceInfo si)
throws HiveException {
- // No-op: testing events only
+ filteredSources.increment();
}
}
}