You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:25:24 UTC
[kafka] 01/02: KAFKA-9830: Implement AutoCloseable in ErrorReporter
and subclasses (#8442)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4af11dec94a09542b7f4342e68f68ae93b4be120
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Apr 29 17:07:01 2020 -0700
KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
* The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
* Currently, the producer and its threads are leaked every time a task is stopped
* Responsibility for cleaning up ErrorReporters is transitively assigned to the
ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
* One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter
Reviewers: Arjun Satish <ar...@confluent.io>, Chris Egerton <ch...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../kafka/connect/runtime/WorkerSinkTask.java | 2 +
.../kafka/connect/runtime/WorkerSourceTask.java | 2 +
.../runtime/errors/DeadLetterQueueReporter.java | 5 ++
.../connect/runtime/errors/ErrorReporter.java | 4 +-
.../connect/runtime/errors/ProcessingContext.java | 18 ++++-
.../runtime/errors/RetryWithToleranceOperator.java | 7 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 89 ++++++++++++++++++++++
.../connect/runtime/errors/ErrorReporterTest.java | 14 ++++
8 files changed, 138 insertions(+), 3 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 067d604..77e5848 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
@@ -171,6 +172,7 @@ class WorkerSinkTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not close transformation chain", t);
}
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 72829e3..77df6fc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
@@ -165,6 +166,7 @@ class WorkerSourceTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not close transformation chain", t);
}
+ Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c78026f..c8b816c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -204,4 +204,9 @@ public class DeadLetterQueueReporter implements ErrorReporter {
return null;
}
}
+
+ @Override
+ public void close() {
+ kafkaProducer.close();
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5833616..5eaa427 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.runtime.errors;
/**
* Report an error using the information contained in the {@link ProcessingContext}.
*/
-public interface ErrorReporter {
+public interface ErrorReporter extends AutoCloseable {
/**
* Report an error.
@@ -28,4 +28,6 @@ public interface ErrorReporter {
*/
void report(ProcessingContext context);
+ @Override
+ default void close() { }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index f826d74..e7fb031 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Collection;
@@ -28,7 +29,7 @@ import java.util.Objects;
* Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
* to exist per task in a JVM.
*/
-class ProcessingContext {
+class ProcessingContext implements AutoCloseable {
private Collection<ErrorReporter> reporters = Collections.emptyList();
@@ -216,4 +217,19 @@ class ProcessingContext {
this.reporters = reporters;
}
+ @Override
+ public void close() {
+ ConnectException e = null;
+ for (ErrorReporter reporter : reporters) {
+ try {
+ reporter.close();
+ } catch (Throwable t) {
+ e = e != null ? e : new ConnectException("Failed to close all reporters");
+ e.addSuppressed(t);
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 2513514..4e627ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
* then it is wrapped into a ConnectException and rethrown to the caller.
* <p>
*/
-public class RetryWithToleranceOperator {
+public class RetryWithToleranceOperator implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
@@ -270,4 +270,9 @@ public class RetryWithToleranceOperator {
public boolean failed() {
return this.context.failed();
}
+
+ @Override
+ public void close() {
+ this.context.close();
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 428b3e4..bb42fc6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
@@ -163,6 +165,93 @@ public class ErrorHandlingTaskTest {
}
@Test
+ public void testSinkTasksCloseErrorReporters() throws Exception {
+ ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+ RetryWithToleranceOperator retryWithToleranceOperator = operator();
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ retryWithToleranceOperator.reporters(singletonList(reporter));
+
+ createSinkTask(initialState, retryWithToleranceOperator);
+
+ expectInitializeTask();
+ reporter.close();
+ EasyMock.expectLastCall();
+ sinkTask.stop();
+ EasyMock.expectLastCall();
+
+ consumer.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerSinkTask.initialize(TASK_CONFIG);
+ workerSinkTask.initializeAndStart();
+ workerSinkTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSourceTasksCloseErrorReporters() {
+ ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+ RetryWithToleranceOperator retryWithToleranceOperator = operator();
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ retryWithToleranceOperator.reporters(singletonList(reporter));
+
+ createSourceTask(initialState, retryWithToleranceOperator);
+
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
+ producer.close(EasyMock.anyObject());
+ PowerMock.expectLastCall();
+
+ reporter.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerSourceTask.initialize(TASK_CONFIG);
+ workerSourceTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCloseErrorReportersExceptionPropagation() {
+ ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
+ ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+
+ RetryWithToleranceOperator retryWithToleranceOperator = operator();
+ retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
+
+ createSourceTask(initialState, retryWithToleranceOperator);
+
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
+ producer.close(EasyMock.anyObject());
+ PowerMock.expectLastCall();
+
+ // Even though the reporters throw exceptions, they should both still be closed.
+ reporterA.close();
+ EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+ reporterB.close();
+ EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+ PowerMock.replayAll();
+
+ workerSourceTask.initialize(TASK_CONFIG);
+ workerSourceTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testErrorHandlingInSinkTasks() throws Exception {
Map<String, String> reportProps = new HashMap<>();
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 00a922f..f01cd49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -147,6 +147,20 @@ public class ErrorReporterTest {
}
@Test
+ public void testCloseDLQ() {
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+ producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+ producer.close();
+ EasyMock.expectLastCall();
+ replay(producer);
+
+ deadLetterQueueReporter.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testLogOnDisabledLogReporter() {
LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics);