You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/29 00:33:07 UTC
[incubator-druid] branch 0.12.3 updated: Don't let catch/finally
suppress main exception in IncrementalPublishingKafkaIndexTaskRunner
(#6260)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.12.3
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.3 by this push:
new 05c802d Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6260)
05c802d is described below
commit 05c802d60fa8b5a5640c1f423f2df37c5577d309
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Tue Aug 28 17:33:05 2018 -0700
Don't let catch/finally suppress main exception in IncrementalPublishingKafkaIndexTaskRunner (#6260)
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 65 +++++++++++++++++-----
1 file changed, 51 insertions(+), 14 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d0590dc..6ace135 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -419,6 +419,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
)
);
+ Throwable caughtExceptionOuter = null;
try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -512,6 +513,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
// Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = !assignment.isEmpty();
status = Status.READING;
+ Throwable caughtExceptionInner = null;
try {
while (stillReading) {
if (possiblyPause()) {
@@ -717,12 +719,22 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
+ caughtExceptionInner = e;
log.error(e, "Encountered exception in run() before persisting.");
throw e;
}
finally {
log.info("Persisting all pending data");
- driver.persist(committerSupplier.get()); // persist pending data
+ try {
+ driver.persist(committerSupplier.get()); // persist pending data
+ }
+ catch (Exception e) {
+ if (caughtExceptionInner != null) {
+ caughtExceptionInner.addSuppressed(e);
+ } else {
+ throw e;
+ }
+ }
}
synchronized (statusLock) {
@@ -792,9 +804,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
catch (InterruptedException | RejectedExecutionException e) {
// (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
// the final publishing.
- Futures.allAsList(publishWaitList).cancel(true);
- Futures.allAsList(handOffWaitList).cancel(true);
- appenderator.closeNow();
+ caughtExceptionOuter = e;
+ try {
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
+ if (appenderator != null) {
+ appenderator.closeNow();
+ }
+ }
+ catch (Exception e2) {
+ e.addSuppressed(e2);
+ }
// handle the InterruptedException that gets wrapped in a RejectedExecutionException
if (e instanceof RejectedExecutionException
&& (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
@@ -811,21 +831,38 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
catch (Exception e) {
// (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
- Futures.allAsList(publishWaitList).cancel(true);
- Futures.allAsList(handOffWaitList).cancel(true);
- appenderator.closeNow();
+ caughtExceptionOuter = e;
+ try {
+ Futures.allAsList(publishWaitList).cancel(true);
+ Futures.allAsList(handOffWaitList).cancel(true);
+ if (appenderator != null) {
+ appenderator.closeNow();
+ }
+ }
+ catch (Exception e2) {
+ e.addSuppressed(e2);
+ }
throw e;
}
finally {
- if (driver != null) {
- driver.close();
+ try {
+ if (driver != null) {
+ driver.close();
+ }
+ if (chatHandlerProvider.isPresent()) {
+ chatHandlerProvider.get().unregister(getId());
+ }
+
+ toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+ toolbox.getDataSegmentServerAnnouncer().unannounce();
}
- if (chatHandlerProvider.isPresent()) {
- chatHandlerProvider.get().unregister(getId());
+ catch (Exception e) {
+ if (caughtExceptionOuter != null) {
+ caughtExceptionOuter.addSuppressed(e);
+ } else {
+ throw e;
+ }
}
-
- toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
- toolbox.getDataSegmentServerAnnouncer().unannounce();
}
return success();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org