You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/28 06:02:09 UTC

[kafka] branch trunk updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a2e87fe  KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
a2e87fe is described below

commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Tue Nov 27 22:01:21 2018 -0800

    KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
    
    The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able.   Accessing the toggle through the `Herder` causes the same code to be called recursively.  This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`.
    
    Author: Robert Yokota <ra...@gmail.com>
    
    Reviewers: Magesh Nandakumar <ma...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5914 from rayokota/KAFKA-7620
---
 .../kafka/connect/runtime/ConnectorConfig.java     |  5 ++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  6 ---
 .../connect/runtime/WorkerConfigTransformer.java   | 44 ++++++++++++++--------
 .../runtime/distributed/DistributedHerder.java     |  8 ----
 .../runtime/standalone/StandaloneHerder.java       |  8 ----
 .../runtime/WorkerConfigTransformerTest.java       | 13 ++++---
 6 files changed, 39 insertions(+), 45 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efcc01d..8889aad 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig {
             "indicates that a configuration value will expire in the future.";
 
     private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-    public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString();
-    public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString();
+    public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+    public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
     public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout";
     public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc14..c572e20 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,12 +149,6 @@ public interface Herder {
     void restartTask(ConnectorTaskId id, Callback<Void> cb);
 
     /**
-     * Get the configuration reload action.
-     * @param connName name of the connector
-     */
-    ConfigReloadAction connectorConfigReloadAction(final String connName);
-
-    /**
      * Restart the connector.
      * @param connName name of the connector
      * @param cb callback to invoke upon completion
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c7..3373d5c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@ import java.util.concurrent.ConcurrentMap;
  * retrieved TTL values.
  */
 public class WorkerConfigTransformer {
+    private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
+
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
@@ -46,7 +53,16 @@ public class WorkerConfigTransformer {
         if (configs == null) return null;
         ConfigTransformerResult result = configTransformer.transform(configs);
         if (connectorName != null) {
-            scheduleReload(connectorName, result.ttls());
+            String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+            String action = (String) ConfigDef.parseType(key, configs.get(key), ConfigDef.Type.STRING);
+            if (action == null) {
+                // The default action is "restart".
+                action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART;
+            }
+            ConfigReloadAction reloadAction = ConfigReloadAction.valueOf(action.toUpperCase(Locale.ROOT));
+            if (reloadAction == ConfigReloadAction.RESTART) {
+                scheduleReload(connectorName, result.ttls());
+            }
         }
         return result.data();
     }
@@ -58,21 +74,19 @@ public class WorkerConfigTransformer {
     }
 
     private void scheduleReload(String connectorName, String path, long ttl) {
-        Herder herder = worker.herder();
-        if (herder.connectorConfigReloadAction(connectorName) == Herder.ConfigReloadAction.RESTART) {
-            Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
-            if (connectorRequests == null) {
-                connectorRequests = new ConcurrentHashMap<>();
-                requests.put(connectorName, connectorRequests);
-            } else {
-                HerderRequest previousRequest = connectorRequests.get(path);
-                if (previousRequest != null) {
-                    // Delete previous request for ttl which is now stale
-                    previousRequest.cancel();
-                }
+        Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
+        if (connectorRequests == null) {
+            connectorRequests = new ConcurrentHashMap<>();
+            requests.put(connectorName, connectorRequests);
+        } else {
+            HerderRequest previousRequest = connectorRequests.get(path);
+            if (previousRequest != null) {
+                // Delete previous request for ttl which is now stale
+                previousRequest.cancel();
             }
-            HerderRequest request = herder.restartConnector(ttl, connectorName, null);
-            connectorRequests.put(path, request);
         }
+        log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
+        HerderRequest request = worker.herder().restartConnector(ttl, connectorName, null);
+        connectorRequests.put(path, request);
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 0213730..25420fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -61,7 +61,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
@@ -643,13 +642,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    public ConfigReloadAction connectorConfigReloadAction(final String connName) {
-        return ConfigReloadAction.valueOf(
-                configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
-                        .toUpperCase(Locale.ROOT));
-    }
-
-    @Override
     public void restartConnector(final String connName, final Callback<Void> callback) {
         restartConnector(0, connName, callback);
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40ad980..fe31c28 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Executors;
@@ -261,13 +260,6 @@ public class StandaloneHerder extends AbstractHerder {
     }
 
     @Override
-    public ConfigReloadAction connectorConfigReloadAction(final String connName) {
-        return ConfigReloadAction.valueOf(
-                configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
-                        .toUpperCase(Locale.ROOT));
-    }
-
-    @Override
     public synchronized void restartConnector(String connName, Callback<Void> cb) {
         if (!configState.contains(connName))
             cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index b5da264..e30acb1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -28,9 +28,12 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -69,18 +72,18 @@ public class WorkerConfigTransformerTest {
     @Test
     public void testReplaceVariableWithTTL() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
 
         replayAll();
 
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
-        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+        Map<String, String> props = new HashMap<>();
+        props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
+        props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, props);
     }
 
     @Test
     public void testReplaceVariableWithTTLAndScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
 
         replayAll();
@@ -92,11 +95,9 @@ public class WorkerConfigTransformerTest {
     @Test
     public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
 
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expectLastCall();
         requestId.cancel();
         EasyMock.expectLastCall();