You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:25:23 UTC

[kafka] branch 2.3 updated (ba95a5f -> a426745)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a change to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from ba95a5f  MINOR: Fix broken JMX link in docs by adding missing starting double quote (#8587)
     new 4af11de  KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
     new a426745  KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++
 .../connect/runtime/errors/ErrorReporter.java      |  4 +-
 .../connect/runtime/errors/ProcessingContext.java  | 18 ++++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 +-
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 12 ++-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 89 ++++++++++++++++++++++
 .../connect/runtime/errors/ErrorReporterTest.java  | 14 ++++
 9 files changed, 147 insertions(+), 6 deletions(-)


[kafka] 02/02: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a42674552ce031a4f1a7fcf6d9433cfa80b2a8b1
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Apr 29 20:42:13 2020 -0700

    KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
    
    Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.
    
    Reviewers: Gwen Shapira <cs...@gmail.com>, Aakash Shah <as...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e78276a..e301581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -281,9 +281,15 @@ public class KafkaBasedLog<K, V> {
             Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
+                TopicPartition topicPartition = entry.getKey();
+                long endOffset = entry.getValue();
+                long lastConsumedOffset = consumer.position(topicPartition);
+                if (lastConsumedOffset >= endOffset) {
+                    log.trace("Read to end offset {} for {}", endOffset, topicPartition);
                     it.remove();
-                else {
+                } else {
+                    log.trace("Behind end offset {} for {}; last-read offset is {}",
+                            endOffset, topicPartition, lastConsumedOffset);
                     poll(Integer.MAX_VALUE);
                     break;
                 }
@@ -345,4 +351,4 @@ public class KafkaBasedLog<K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}


[kafka] 01/02: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4af11dec94a09542b7f4342e68f68ae93b4be120
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Apr 29 17:07:01 2020 -0700

    KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
    
    * The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
    * Currently, the producer and its threads are leaked every time a task is stopped
    * Responsibility for cleaning up ErrorReporters is transitively assigned to the
        ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
    * One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Chris Egerton <ch...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++
 .../connect/runtime/errors/ErrorReporter.java      |  4 +-
 .../connect/runtime/errors/ProcessingContext.java  | 18 ++++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 89 ++++++++++++++++++++++
 .../connect/runtime/errors/ErrorReporterTest.java  | 14 ++++
 8 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 067d604..77e5848 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -171,6 +172,7 @@ class WorkerSinkTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 72829e3..77df6fc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
@@ -165,6 +166,7 @@ class WorkerSourceTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c78026f..c8b816c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -204,4 +204,9 @@ public class DeadLetterQueueReporter implements ErrorReporter {
             return null;
         }
     }
+
+    @Override
+    public void close() {
+        kafkaProducer.close();
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5833616..5eaa427 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.runtime.errors;
 /**
  * Report an error using the information contained in the {@link ProcessingContext}.
  */
-public interface ErrorReporter {
+public interface ErrorReporter extends AutoCloseable {
 
     /**
      * Report an error.
@@ -28,4 +28,6 @@ public interface ErrorReporter {
      */
     void report(ProcessingContext context);
 
+    @Override
+    default void close() { }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index f826d74..e7fb031 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.Collection;
@@ -28,7 +29,7 @@ import java.util.Objects;
  * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
  * to exist per task in a JVM.
  */
-class ProcessingContext {
+class ProcessingContext implements AutoCloseable {
 
     private Collection<ErrorReporter> reporters = Collections.emptyList();
 
@@ -216,4 +217,19 @@ class ProcessingContext {
         this.reporters = reporters;
     }
 
+    @Override
+    public void close() {
+        ConnectException e = null;
+        for (ErrorReporter reporter : reporters) {
+            try {
+                reporter.close();
+            } catch (Throwable t) {
+                e = e != null ? e : new ConnectException("Failed to close all reporters");
+                e.addSuppressed(t);
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 2513514..4e627ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
  * then it is wrapped into a ConnectException and rethrown to the caller.
  * <p>
  */
-public class RetryWithToleranceOperator {
+public class RetryWithToleranceOperator implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
 
@@ -270,4 +270,9 @@ public class RetryWithToleranceOperator {
     public boolean failed() {
         return this.context.failed();
     }
+
+    @Override
+    public void close() {
+        this.context.close();
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 428b3e4..bb42fc6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Arrays;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
@@ -163,6 +165,93 @@ public class ErrorHandlingTaskTest {
     }
 
     @Test
+    public void testSinkTasksCloseErrorReporters() throws Exception {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSinkTask(initialState, retryWithToleranceOperator);
+
+        expectInitializeTask();
+        reporter.close();
+        EasyMock.expectLastCall();
+        sinkTask.stop();
+        EasyMock.expectLastCall();
+
+        consumer.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSinkTask.initialize(TASK_CONFIG);
+        workerSinkTask.initializeAndStart();
+        workerSinkTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSourceTasksCloseErrorReporters() {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        reporter.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCloseErrorReportersExceptionPropagation() {
+        ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
+        ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        // Even though the reporters throw exceptions, they should both still be closed.
+        reporterA.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        reporterB.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testErrorHandlingInSinkTasks() throws Exception {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 00a922f..f01cd49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -147,6 +147,20 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testCloseDLQ() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+            producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+        producer.close();
+        EasyMock.expectLastCall();
+        replay(producer);
+
+        deadLetterQueueReporter.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testLogOnDisabledLogReporter() {
         LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics);