You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:24:44 UTC

[kafka] branch 2.4 updated (d76883c -> ad7030b)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a change to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from d76883c  MINOR: Fix unused arguments used in formatted string and log messages (#8036)
     new 4e32bc8  KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
     new 7a72a2a  KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
     new ad7030b  KAFKA-9633: Ensure ConfigProviders are closed (#8204)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/kafka/common/config/AbstractConfig.java |  1 +
 .../kafka/common/config/AbstractConfigTest.java    | 18 ++++-
 .../config/provider/MockFileConfigProvider.java    | 35 +++++++++
 .../org/apache/kafka/connect/runtime/Worker.java   | 16 ++--
 .../connect/runtime/WorkerConfigTransformer.java   | 10 ++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++
 .../connect/runtime/errors/ErrorReporter.java      |  4 +-
 .../connect/runtime/errors/ProcessingContext.java  | 18 ++++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 +-
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 12 ++-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 89 ++++++++++++++++++++++
 .../apache/kafka/connect/runtime/WorkerTest.java   | 33 +++++++-
 .../connect/runtime/errors/ErrorReporterTest.java  | 14 ++++
 15 files changed, 249 insertions(+), 17 deletions(-)


[kafka] 03/03: KAFKA-9633: Ensure ConfigProviders are closed (#8204)

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ad7030bd658b6037842f398c2787a4dedd75e321
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Thu Apr 30 18:33:10 2020 +0100

    KAFKA-9633: Ensure ConfigProviders are closed (#8204)
    
    ConfigProvider extends Closeable, but were not closed in the following contexts:
    * AbstractConfig
    * WorkerConfigTransformer
    * Worker
    
    This commit ensures that ConfigProviders are close in the above contexts.
    
    It also adds MockFileConfigProvider.assertClosed()
    Gradle executes test classes concurrently, so MockFileConfigProvider
    can't simply use a static field to hold its closure state.
    Instead use a protocol whereby the MockFileConfigProvider is configured
    with some unique ket identifying the test which also used when calling
    assertClosed().
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>
---
 .../apache/kafka/common/config/AbstractConfig.java |  1 +
 .../kafka/common/config/AbstractConfigTest.java    | 18 ++++++++++-
 .../config/provider/MockFileConfigProvider.java    | 35 ++++++++++++++++++++++
 .../org/apache/kafka/connect/runtime/Worker.java   | 16 +++++-----
 .../connect/runtime/WorkerConfigTransformer.java   | 10 ++++++-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 33 ++++++++++++++++++--
 6 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3992a41..e91a9a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -483,6 +483,7 @@ public class AbstractConfig {
                 resolvedOriginals.putAll(result.data());
             }
         }
+        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
 
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index b5fff6f..834278b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -340,6 +341,8 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as part of config.properties
         props.put("config.providers", "file");
         props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.file.param.testId", id);
         props.put("prefix.ssl.truststore.location.number", 5);
         props.put("sasl.kerberos.service.name", "service name");
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -349,6 +352,7 @@ public class AbstractConfigTest {
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
         assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
         assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -357,12 +361,15 @@ public class AbstractConfigTest {
         Properties providers = new Properties();
         providers.put("config.providers", "file");
         providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
         assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -370,13 +377,16 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as a separate variable
         Properties providers = new Properties();
         providers.put("config.providers", "file");
-        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
         Map<String, ?> provMap = convertPropertiesToMap(providers);
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
         assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -385,6 +395,8 @@ public class AbstractConfigTest {
         Properties providers = new Properties();
         providers.put("config.providers", "file,vault");
         providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -396,6 +408,7 @@ public class AbstractConfigTest {
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
         assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
         assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -429,9 +442,12 @@ public class AbstractConfigTest {
         Properties props = new Properties();
         props.put("config.providers", "test");
         props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.test.param.testId", id);
         props.put("random", "${test:/foo/bar/testpath:random}");
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
         assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
index e779cbe..3409096 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -19,11 +19,46 @@ package org.apache.kafka.common.config.provider;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class MockFileConfigProvider extends FileConfigProvider {
 
+    private static final Map<String, MockFileConfigProvider> INSTANCES = Collections.synchronizedMap(new HashMap<>());
+    private String id;
+    private boolean closed = false;
+
+    public void configure(Map<String, ?> configs) {
+        Object id = configs.get("testId");
+        if (id == null) {
+            throw new RuntimeException(getClass().getName() + " missing 'testId' config");
+        }
+        if (this.id != null) {
+            throw new RuntimeException(getClass().getName() + " instance was configured twice");
+        }
+        this.id = id.toString();
+        INSTANCES.put(id.toString(), this);
+    }
+
     @Override
     protected Reader reader(String path) throws IOException {
         return new StringReader("key=testKey\npassword=randomPassword");
     }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+    }
+
+    public static void assertClosed(String id) {
+        MockFileConfigProvider instance = INSTANCES.remove(id);
+        assertNotNull(instance);
+        synchronized (instance) {
+            assertTrue(instance.closed);
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3c4ece7..3cf4d01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -104,8 +104,8 @@ public class Worker {
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
-    private WorkerConfigTransformer workerConfigTransformer;
-    private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    private final WorkerConfigTransformer workerConfigTransformer;
+    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
 
     public Worker(
         String workerId,
@@ -220,6 +220,8 @@ public class Worker {
 
         workerMetricsGroup.close();
         connectorStatusMetricsGroup.close();
+
+        workerConfigTransformer.close();
     }
 
     /**
@@ -854,11 +856,11 @@ public class Worker {
     }
 
     static class ConnectorStatusMetricsGroup {
-        private ConnectMetrics connectMetrics;
-        private ConnectMetricsRegistry registry;
-        private ConcurrentMap<String, MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<>();
-        private Herder herder;
-        private ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
+        private final ConnectMetrics connectMetrics;
+        private final ConnectMetricsRegistry registry;
+        private final ConcurrentMap<String, MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<>();
+        private final Herder herder;
+        private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
 
 
         protected ConnectorStatusMetricsGroup(
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1a799bb..318626b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
 import org.apache.kafka.connect.util.Callback;
 import org.slf4j.Logger;
@@ -34,15 +35,17 @@ import java.util.concurrent.ConcurrentMap;
  * A wrapper class to perform configuration transformations and schedule reloads for any
  * retrieved TTL values.
  */
-public class WorkerConfigTransformer {
+public class WorkerConfigTransformer implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
 
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
+    private final Map<String, ConfigProvider> configProviders;
 
     public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
         this.worker = worker;
+        this.configProviders = configProviders;
         this.configTransformer = new ConfigTransformer(configProviders);
     }
 
@@ -98,4 +101,9 @@ public class WorkerConfigTransformer {
         HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb);
         connectorRequests.put(path, request);
     }
+
+    @Override
+    public void close() {
+        configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 7021503..e7ffd60 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
@@ -76,6 +77,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -132,6 +134,7 @@ public class WorkerTest extends ThreadedTest {
     @Mock private HeaderConverter taskHeaderConverter;
     @Mock private ExecutorService executorService;
     @MockNice private ConnectorConfig connectorConfig;
+    private String mockFileProviderTestId;
 
     @Before
     public void setup() {
@@ -144,6 +147,10 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        workerProps.put("config.providers", "file");
+        workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        mockFileProviderTestId = UUID.randomUUID().toString();
+        workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
         config = new StandaloneConfig(workerProps);
 
         defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -187,9 +194,11 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(connector.version()).andReturn("1.0");
 
+        expectFileConfigProvider();
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
+
         connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
@@ -235,12 +244,24 @@ public class WorkerTest extends ThreadedTest {
         assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
+        MockFileConfigProvider.assertClosed(mockFileProviderTestId);
+    }
+
+    private void expectFileConfigProvider() {
+        EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(),
+                    EasyMock.eq("config.providers.file"), EasyMock.anyObject()))
+                .andAnswer(() -> {
+                    MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
+                    mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+                    return mockFileConfigProvider;
+                });
     }
 
     @Test
     public void testStartConnectorFailure() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -285,6 +306,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddConnectorByAlias() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
@@ -349,6 +371,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddConnectorByShortAlias() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
@@ -410,6 +433,7 @@ public class WorkerTest extends ThreadedTest {
     public void testStopInvalidConnector() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         PowerMock.replayAll();
 
@@ -425,6 +449,7 @@ public class WorkerTest extends ThreadedTest {
     public void testReconfigureConnectorTasks() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
@@ -513,6 +538,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddRemoveTask() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
@@ -607,6 +633,7 @@ public class WorkerTest extends ThreadedTest {
     public void testTaskStatusMetricsStatuses() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
@@ -738,10 +765,9 @@ public class WorkerTest extends ThreadedTest {
         tasks.put(new ConnectorTaskId("c1", 1), workerTask);
         tasks.put(new ConnectorTaskId("c2", 0), workerTask);
 
-
         expectConverters();
-
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader);
         EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader);
@@ -772,6 +798,7 @@ public class WorkerTest extends ThreadedTest {
     public void testStartTaskFailure() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
@@ -817,6 +844,7 @@ public class WorkerTest extends ThreadedTest {
     public void testCleanupTasksOnStop() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
@@ -907,6 +935,7 @@ public class WorkerTest extends ThreadedTest {
     public void testConverterOverrides() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 


[kafka] 01/03: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4e32bc85c44eda0820d73acd0d13a8a04dcde99c
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Apr 29 17:07:01 2020 -0700

    KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
    
    * The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
    * Currently, the producer and its threads are leaked every time a task is stopped
    * Responsibility for cleaning up ErrorReporters is transitively assigned to the
        ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
    * One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Chris Egerton <ch...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++
 .../connect/runtime/errors/ErrorReporter.java      |  4 +-
 .../connect/runtime/errors/ProcessingContext.java  | 18 ++++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 89 ++++++++++++++++++++++
 .../connect/runtime/errors/ErrorReporterTest.java  | 14 ++++
 8 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 9a71a66..f89918c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -171,6 +172,7 @@ class WorkerSinkTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index cea99ee..c3739b5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
@@ -165,6 +166,7 @@ class WorkerSourceTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not close transformation chain", t);
         }
+        Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index fc6181d..20ed2f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -204,4 +204,9 @@ public class DeadLetterQueueReporter implements ErrorReporter {
             return null;
         }
     }
+
+    @Override
+    public void close() {
+        kafkaProducer.close();
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
index 5833616..5eaa427 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
@@ -19,7 +19,7 @@ package org.apache.kafka.connect.runtime.errors;
 /**
  * Report an error using the information contained in the {@link ProcessingContext}.
  */
-public interface ErrorReporter {
+public interface ErrorReporter extends AutoCloseable {
 
     /**
      * Report an error.
@@ -28,4 +28,6 @@ public interface ErrorReporter {
      */
     void report(ProcessingContext context);
 
+    @Override
+    default void close() { }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
index f826d74..e7fb031 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.util.Collection;
@@ -28,7 +29,7 @@ import java.util.Objects;
  * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant
  * to exist per task in a JVM.
  */
-class ProcessingContext {
+class ProcessingContext implements AutoCloseable {
 
     private Collection<ErrorReporter> reporters = Collections.emptyList();
 
@@ -216,4 +217,19 @@ class ProcessingContext {
         this.reporters = reporters;
     }
 
+    @Override
+    public void close() {
+        ConnectException e = null;
+        for (ErrorReporter reporter : reporters) {
+            try {
+                reporter.close();
+            } catch (Throwable t) {
+                e = e != null ? e : new ConnectException("Failed to close all reporters");
+                e.addSuppressed(t);
+            }
+        }
+        if (e != null) {
+            throw e;
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 2513514..4e627ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
  * then it is wrapped into a ConnectException and rethrown to the caller.
  * <p>
  */
-public class RetryWithToleranceOperator {
+public class RetryWithToleranceOperator implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
 
@@ -270,4 +270,9 @@ public class RetryWithToleranceOperator {
     public boolean failed() {
         return this.context.failed();
     }
+
+    @Override
+    public void close() {
+        this.context.close();
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 428b3e4..bb42fc6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Arrays;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
@@ -163,6 +165,93 @@ public class ErrorHandlingTaskTest {
     }
 
     @Test
+    public void testSinkTasksCloseErrorReporters() throws Exception {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSinkTask(initialState, retryWithToleranceOperator);
+
+        expectInitializeTask();
+        reporter.close();
+        EasyMock.expectLastCall();
+        sinkTask.stop();
+        EasyMock.expectLastCall();
+
+        consumer.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSinkTask.initialize(TASK_CONFIG);
+        workerSinkTask.initializeAndStart();
+        workerSinkTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSourceTasksCloseErrorReporters() {
+        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(singletonList(reporter));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        reporter.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCloseErrorReportersExceptionPropagation() {
+        ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
+        ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+
+        RetryWithToleranceOperator retryWithToleranceOperator = operator();
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
+
+        createSourceTask(initialState, retryWithToleranceOperator);
+
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
+        producer.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        // Even though the reporters throw exceptions, they should both still be closed.
+        reporterA.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        reporterB.close();
+        EasyMock.expectLastCall().andThrow(new RuntimeException());
+
+        PowerMock.replayAll();
+
+        workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testErrorHandlingInSinkTasks() throws Exception {
         Map<String, String> reportProps = new HashMap<>();
         reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 00a922f..f01cd49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -147,6 +147,20 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testCloseDLQ() {
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
+            producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
+
+        producer.close();
+        EasyMock.expectLastCall();
+        replay(producer);
+
+        deadLetterQueueReporter.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testLogOnDisabledLogReporter() {
         LogReporter logReporter = new LogReporter(TASK_ID, config(emptyMap()), errorHandlingMetrics);
 


[kafka] 02/03: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7a72a2a6da414b4974963de89365b710c6730699
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Apr 29 20:42:13 2020 -0700

    KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
    
    Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.
    
    Reviewers: Gwen Shapira <cs...@gmail.com>, Aakash Shah <as...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e78276a..e301581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -281,9 +281,15 @@ public class KafkaBasedLog<K, V> {
             Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
+                TopicPartition topicPartition = entry.getKey();
+                long endOffset = entry.getValue();
+                long lastConsumedOffset = consumer.position(topicPartition);
+                if (lastConsumedOffset >= endOffset) {
+                    log.trace("Read to end offset {} for {}", endOffset, topicPartition);
                     it.remove();
-                else {
+                } else {
+                    log.trace("Behind end offset {} for {}; last-read offset is {}",
+                            endOffset, topicPartition, lastConsumedOffset);
                     poll(Integer.MAX_VALUE);
                     break;
                 }
@@ -345,4 +351,4 @@ public class KafkaBasedLog<K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}