You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/16 01:46:24 UTC
[kafka] branch 2.5 updated: KAFKA-9955: Prevent SinkTask::close
from shadowing other exceptions (#8618)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new e4fc22c KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
e4fc22c is described below
commit e4fc22c051b9fbc509646ed68243a6e70e835cd3
Author: Greg Harris <gh...@gmail.com>
AuthorDate: Fri May 15 17:53:32 2020 -0700
KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
* If two exceptions are thrown the `closePartitions` exception is suppressed
* Add unit tests that throw exceptions in put and close to verify that
the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute
Reviewers: Chris Egerton <ch...@confluent.io>, Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../java/org/apache/kafka/common/utils/Utils.java | 12 +++
.../kafka/connect/runtime/WorkerSinkTask.java | 9 +--
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 86 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e9d4cc4..e2160d9 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -865,6 +865,18 @@ public final class Utils {
}
/**
+ * An {@link AutoCloseable} interface without a throws clause in the signature
+ *
+ * This is used with lambda expressions in try-with-resources clauses
+ * to avoid casting un-checked exceptions to checked exceptions unnecessarily.
+ */
+ @FunctionalInterface
+ public interface UncheckedCloseable extends AutoCloseable {
+ @Override
+ void close();
+ }
+
+ /**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
*/
public static void closeQuietly(AutoCloseable closeable, String name) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 659dadf..3a8c8d4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Utils.UncheckedCloseable;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
@@ -193,13 +194,11 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void execute() {
initializeAndStart();
- try {
+ // Make sure any uncommitted data has been committed and the task has
+ // a chance to clean up its state
+ try (UncheckedCloseable suppressible = this::closePartitions) {
while (!isStopping())
iteration();
- } finally {
- // Make sure any uncommitted data has been committed and the task has
- // a chance to clean up its state
- closePartitions();
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 285cbbe..5dc2f44 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -86,6 +87,7 @@ import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -856,6 +858,90 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testSinkTasksHandleCloseErrors() throws Exception {
+ createTask(initialState);
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ // Put one message through the task to get some offsets to commit
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall().andVoid();
+
+ // Stop the task during the next put
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall().andAnswer(() -> {
+ workerTask.stop();
+ return null;
+ });
+
+ consumer.wakeup();
+ PowerMock.expectLastCall();
+
+ // Throw another exception while closing the task's assignment
+ EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+ .andStubReturn(Collections.emptyMap());
+ Throwable closeException = new RuntimeException();
+ sinkTask.close(EasyMock.anyObject());
+ PowerMock.expectLastCall().andThrow(closeException);
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ try {
+ workerTask.execute();
+ fail("workerTask.execute should have thrown an exception");
+ } catch (RuntimeException e) {
+ PowerMock.verifyAll();
+ assertSame("Exception from close should propagate as-is", closeException, e);
+ }
+ }
+
+ @Test
+ public void testSuppressCloseErrors() throws Exception {
+ createTask(initialState);
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ // Put one message through the task to get some offsets to commit
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall().andVoid();
+
+ // Throw an exception on the next put to trigger shutdown behavior
+ // This exception is the true "cause" of the failure
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ Throwable putException = new RuntimeException();
+ sinkTask.put(EasyMock.anyObject());
+ PowerMock.expectLastCall().andThrow(putException);
+
+ // Throw another exception while closing the task's assignment
+ EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+ .andStubReturn(Collections.emptyMap());
+ Throwable closeException = new RuntimeException();
+ sinkTask.close(EasyMock.anyObject());
+ PowerMock.expectLastCall().andThrow(closeException);
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ try {
+ workerTask.execute();
+ fail("workerTask.execute should have thrown an exception");
+ } catch (ConnectException e) {
+ PowerMock.verifyAll();
+ assertSame("Exception from put should be the cause", putException, e.getCause());
+ assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0);
+ assertSame(closeException, e.getSuppressed()[0]);
+ }
+ }
+
// Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
// See KAFKA-5731 for more information.