You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 03:58:40 UTC

[GitHub] [kafka] gharris1727 opened a new pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

gharris1727 opened a new pull request #8618:
URL: https://github.com/apache/kafka/pull/8618


   * Catches exceptions from SinkTask::close call in WorkerSinkTask
   * Logs exception at ERROR level but does not propagate exception upward
   * Add unit test that throws exceptions in put and close to verify that
     the exception from put is propagated out of WorkerSinkTask::execute
   
   Signed-off-by: Greg Harris <gr...@confluent.io>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420950647



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (QuietClosable ignored = this::closePartitions) {

Review comment:
       I used `ignored` as a way to trigger the IDE to avoid highlighting the unused variable.
   I can change this to suppressible since the behavior is better described (suppressible "if another exception occurs first").




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-624196656


   @rhauch @kkonstantine This is ready for committer review. Thanks for taking a look!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420248205



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();

Review comment:
       If we're going to refer to this later while making assertions, a more descriptive name might help readability. Something like `putFailure` here and `closeFailure` below, maybe?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {
+            PowerMock.verifyAll();
+            // The exception from close should not shadow the exception from put.

Review comment:
       Might be better to use this text as the message for the assertions instead of putting it here as a comment?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {

Review comment:
       The call to `fail()` above is going to get caught here, isn't it? Think that might make this difficult to debug if a future change causes this to fail for some reason.
   
   Based on the `deliverMessages` method where `SinkTask::put` is invoked, it looks like we might be able to narrow this down to a `ConnectException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421005156



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       Let me see if I understand what you are describing as `wrapped`. 
   My use case is as follows: 
   `SinkTask#close` attempts to release resources and if it fails it throws a `ConnectException` as we'd expect from connector developers to do (currently it throws a `RuntimeException` which might be less representative). 
   
   With your fix this exception can appear as suppressed when an exception happens in `SinkTask#put` and that's what your test is guarding against. 
   
   My point is to add the missing test case for when the exception on `close` is the only exception that is thrown. There is a variety of ways to do that, but I agree with you, this test is not there now. However, I don't think this necessarily makes it out of scope. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421189122



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (UncheckedCloseable supressible = this::closePartitions) {

Review comment:
       Although I dig supressible with 1 p, like "sup? exception what are you up to?" 
   I'm afraid I'll have to abide by the usual rules and recommend: `suppressible` here 😄 
   (probably the IDE will complain about a typo too). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420895742



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (QuietClosable ignored = this::closePartitions) {

Review comment:
       Again naming here can be misleading. `ignored` is more like unused in the try block. 
   But also that's not the point of this idiom. It's about suppressing exceptions from `finally` instead of the originator. 
   
   How about `suppressible`? Also, `unused` might be even better, because `ignored` is untrue especially if `closePartitions` is the only method that throws. But I think `suppressible` highlights the intentions here specifically. 

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -885,6 +885,18 @@ public static void closeAll(Closeable... closeables) throws IOException {
             throw exception;
     }
 
+    /**
+     * An {@link AutoCloseable} interface without a throws clause in the signature
+     *
+     * This is used with lambda expressions in try-with-resources clauses
+     * to avoid casting un-checked exceptions to checked exceptions unnecessarily.
+     */
+    @FunctionalInterface
+    public interface QuietClosable extends AutoCloseable {

Review comment:
       ```suggestion
       public interface QuietClosable extends AutoCloseable {
   ```
   I don't think the name is very accurate. The interface does not prevent implementation of close from throwing (as opposed to the methods below) and its demonstrated use does not either. (there's also a typo, if typos in non-words matter). 
   
   How about `UncheckedCloseable`?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       Can you also write a test where an exception is thrown only by `sinkTask.close`? Actually, we could keep the name for this new test here _as is_, and the new test could be named in a way that tells us that the exceptions on close are suppressed in the presence of exceptions in the main try block. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-625502568


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421798861



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (UncheckedCloseable supressible = this::closePartitions) {

Review comment:
       ```suggestion
           try (UncheckedCloseable suppressible = this::closePartitions) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-624979305


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420298081



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (QuietClosable ignored = this::closePartitions) {

Review comment:
       This PR is not changing any of the printing logic, and that's still handled by the caller, `WorkerTask::doRun`. This is roughly what a suppressed exception looks like when it gets logged (from the test setup, not a live connector):
   
   ```
   org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:569)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
   	<snipped>
   	Suppressed: java.lang.RuntimeException
   		at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
   		at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
   		at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97)
   		at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.close(&lt;generated&gt;)
   		at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:402)
   		at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:599)
   		at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
   		... 57 more
   Caused by: java.lang.RuntimeException
   	at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
   	at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
   	at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:68)
   	at org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.put(&lt;generated&gt;)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:547)
   	... 60 more
   ```
   Suppressed exceptions are a native Java feature, and log4j supports printing their stacktraces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420998369



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       Ive written the test as-is, and the exception from close is caught by WorkerTask::doRun.
   
   However, it never gets wrapped in a ConnectException like exceptions from the other connector methods, but i'm not sure that it's in-scope to change this in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #8618:
URL: https://github.com/apache/kafka/pull/8618


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421009491



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       I added that test, so please look at the diff.
   
   By wrapping, i'm completely disregarding the behavior from the connector, I'm only discussing the additional layers of exceptions added by the framework before printing.
   
   At the moment, the RuntimeException from put is wrapped by a ConnectException, but the RuntimeException from close is never wrapped in a ConnectException. I'm questioning whether this is the ideal behavior, and whether we should add that wrapping layer, or consider it out-of-scope for this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-628898153


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-623846094


   @C0urante @ncliang Could you take a look at this when you have a chance?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r421188159



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       That's the testing I had in mind. That's great. 
   No need to change the exception type right now. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ncliang commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
ncliang commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420288853



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (QuietClosable ignored = this::closePartitions) {

Review comment:
       I am not sure I understand how the suppressed exception is logged and not just silently swallowed? Do we need to call ` getSuppressed` and log those somewhere or use one of those `closeQuietly` methods on `Utils`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420998369



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +858,47 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {

Review comment:
       Ive written the test as described, and verified that the exception from close is caught by WorkerTask::doRun.
   
   However, it never gets wrapped in a ConnectException like exceptions from the other connector methods, but i'm not sure that it's in-scope to change this in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org