You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:25:24 UTC

[kafka] 01/02: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4af11dec94a09542b7f4342e68f68ae93b4be120
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Apr 29 17:07:01 2020 -0700

    KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
    
    * The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
    * Currently, the producer and its threads are leaked every time a task is stopped
    * Responsibility for cleaning up ErrorReporters is transitively assigned to the
        ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
    * One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Chris Egerton <ch...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++
 .../connect/runtime/errors/ErrorReporter.java      |  4 +-
 .../connect/runtime/errors/ProcessingContext.java  | 18 ++++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 89 ++++++++++++++++++++++
 .../connect/runtime/errors/ErrorReporterTest.java  | 14 ++++
 8 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 067d604..77e5848 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -171,6 +172,7 @@ class WorkerSinkTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 72829e3..77df6fc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
@@ -165,6 +166,7 @@ class WorkerSourceTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c78026f..c8b816c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -204,4 +204,9 @@ public class DeadLetterQueueReporter implements ErrorReporter {
             return null;
         }
     }
+
+    @Override
+    public void close() {
+        kafkaProducer.close();
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5833616..5eaa427 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.runtime.errors;
 /**
  * Report an error using the information contained in the {@link ProcessingContext}.
  */
-public interface ErrorReporter {
+public interface ErrorReporter extends AutoCloseable {
 
     /**
      * Report an error.
@@ -28,4 +28,6 @@ public interface ErrorReporter {
      */
     void report(ProcessingContext context);
 
+    @Override
+    default void close() { }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index f826d74..e7fb031 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.Collection;
@@ -28,7 +29,7 @@ import java.util.Objects;
  * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
  * to exist per task in a JVM.
  */
-class ProcessingContext {
+class ProcessingContext implements AutoCloseable {
 
     private Collection<ErrorReporter> reporters = Collections.emptyList();
 
@@ -216,4 +217,19 @@ class ProcessingContext {
         this.reporters = reporters;
     }
 
+    @Override
+    public void close() {
+        ConnectException e = null;
+        for (ErrorReporter reporter : reporters) {
+            try {
+                reporter.close();
+            } catch (Throwable t) {
+                e = e != null ? e : new ConnectException("Failed to close all reporters");
+                e.addSuppressed(t);
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 2513514..4e627ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
  * then it is wrapped into a ConnectException and rethrown to the caller.
  * <p>
  */
-public class RetryWithToleranceOperator {
+public class RetryWithToleranceOperator implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
 
@@ -270,4 +270,9 @@ public class RetryWithToleranceOperator {
     public boolean failed() {
         return this.context.failed();
     }
+
+    @Override
+    public void close() {
+        this.context.close();
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 428b3e4..bb42fc6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Arrays;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
@@ -163,6 +165,93 @@ public class ErrorHandlingTaskTest {
     }
 
     @Test
+    public void testSinkTasksCloseErrorReporters() throws Exception {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSinkTask(initialState, retryWithToleranceOperator);
+
+        expectInitializeTask();
+        reporter.close();
+        EasyMock.expectLastCall();
+        sinkTask.stop();
+        EasyMock.expectLastCall();
+
+        consumer.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSinkTask.initialize(TASK_CONFIG);
+        workerSinkTask.initializeAndStart();
+        workerSinkTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSourceTasksCloseErrorReporters() {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        reporter.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCloseErrorReportersExceptionPropagation() {
+        ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
+        ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        // Even though the reporters throw exceptions, they should both still be closed.
+        reporterA.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        reporterB.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testErrorHandlingInSinkTasks() throws Exception {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 00a922f..f01cd49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -147,6 +147,20 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testCloseDLQ() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+            producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+        producer.close();
+        EasyMock.expectLastCall();
+        replay(producer);
+
+        deadLetterQueueReporter.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testLogOnDisabledLogReporter() {
         LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics);