You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 16:54:29 UTC

[GitHub] [kafka] C0urante commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

C0urante commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420248205



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();

Review comment:
       If we're going to refer to this later while making assertions, a more descriptive name might help readability. Something like `putFailure` here and `closeFailure` below, maybe?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {
+            PowerMock.verifyAll();
+            // The exception from close should not shadow the exception from put.

Review comment:
       Might be better to use this text as the message for the assertions instead of putting it here as a comment?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {

Review comment:
       The call to `fail()` above is going to get caught here, isn't it? Think that might make this difficult to debug if a future change causes this to fail for some reason.
   
   Based on the `deliverMessages` method where `SinkTask::put` is invoked, it looks like we might be able to narrow this down to a `ConnectException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org