You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/16 01:46:53 UTC

[kafka] branch 2.4 updated: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 03671d3  KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
03671d3 is described below

commit 03671d3bb5ea6e25602de82bf9197be5481c1d9f
Author: Greg Harris <gh...@gmail.com>
AuthorDate: Fri May 15 17:53:32 2020 -0700

    KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
    
    * If two exceptions are thrown the `closePartitions` exception is suppressed
    * Add unit tests that throw exceptions in put and close to verify that
      the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute
    
    Reviewers: Chris Egerton <ch...@confluent.io>, Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../java/org/apache/kafka/common/utils/Utils.java  | 12 ++++
 .../kafka/connect/runtime/WorkerSinkTask.java      |  9 ++-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 84 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c7b70af..5a69db0 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -826,6 +826,18 @@ public final class Utils {
     }
 
     /**
+     * An {@link AutoCloseable} interface without a throws clause in the signature
+     *
+     * This is used with lambda expressions in try-with-resources clauses
+     * to avoid casting un-checked exceptions to checked exceptions unnecessarily.
+     */
+    @FunctionalInterface
+    public interface UncheckedCloseable extends AutoCloseable {
+        @Override
+        void close();
+    }
+
+    /**
      * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
      */
     public static void closeQuietly(AutoCloseable closeable, String name) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index f89918c..227f2f6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Utils.UncheckedCloseable;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -189,13 +190,11 @@ class WorkerSinkTask extends WorkerTask {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (UncheckedCloseable suppressible = this::closePartitions) {
             while (!isStopping())
                 iteration();
-        } finally {
-            // Make sure any uncommitted data has been committed and the task has
-            // a chance to clean up its state
-            closePartitions();
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 8c93528..c6d25a4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -83,6 +84,7 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -843,6 +845,88 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Stop the task during the next put
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andAnswer(() -> {
+            workerTask.stop();
+            return null;
+        });
+
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable closeException = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(closeException);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail("workerTask.execute should have thrown an exception");
+        } catch (RuntimeException e) {
+            PowerMock.verifyAll();
+            assertSame("Exception from close should propagate as-is", closeException, e);
+        }
+    }
+
+    @Test
+    public void testSuppressCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable putException = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(putException);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable closeException = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(closeException);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail("workerTask.execute should have thrown an exception");
+        } catch (ConnectException e) {
+            PowerMock.verifyAll();
+            assertSame("Exception from put should be the cause", putException, e.getCause());
+            assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0);
+            assertSame(closeException, e.getSuppressed()[0]);
+        }
+    }
+
     // Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
     // rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
     // See KAFKA-5731 for more information.