You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/28 06:02:31 UTC
[kafka] branch 2.0 updated: KAFKA-7620: Fix restart logic for TTLs
in WorkerConfigTransformer
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new e7298f4 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
e7298f4 is described below
commit e7298f4fc53f27f91564f60c3818fa392287ff33
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Tue Nov 27 22:01:21 2018 -0800
KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able. Accessing the toggle through the `Herder` causes the same code to be called recursively. This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`.
Author: Robert Yokota <ra...@gmail.com>
Reviewers: Magesh Nandakumar <ma...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5914 from rayokota/KAFKA-7620
(cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
.../kafka/connect/runtime/ConnectorConfig.java | 5 ++-
.../org/apache/kafka/connect/runtime/Herder.java | 6 ---
.../connect/runtime/WorkerConfigTransformer.java | 44 ++++++++++++++--------
.../runtime/distributed/DistributedHerder.java | 8 ----
.../runtime/standalone/StandaloneHerder.java | 8 ----
.../runtime/WorkerConfigTransformerTest.java | 13 ++++---
6 files changed, 39 insertions(+), 45 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 9d1a50d..d030fed 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig {
"indicates that a configuration value will expire in the future.";
private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
- public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString();
- public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString();
+ public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+ public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout";
public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc14..c572e20 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,12 +149,6 @@ public interface Herder {
void restartTask(ConnectorTaskId id, Callback<Void> cb);
/**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
- ConfigReloadAction connectorConfigReloadAction(final String connName);
-
- /**
* Restart the connector.
* @param connName name of the connector
* @param cb callback to invoke upon completion
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c7..3373d5c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@ import java.util.concurrent.ConcurrentMap;
* retrieved TTL values.
*/
public class WorkerConfigTransformer {
+ private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
+
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
@@ -46,7 +53,16 @@ public class WorkerConfigTransformer {
if (configs == null) return null;
ConfigTransformerResult result = configTransformer.transform(configs);
if (connectorName != null) {
- scheduleReload(connectorName, result.ttls());
+ String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+ String action = (String) ConfigDef.parseType(key, configs.get(key), ConfigDef.Type.STRING);
+ if (action == null) {
+ // The default action is "restart".
+ action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART;
+ }
+ ConfigReloadAction reloadAction = ConfigReloadAction.valueOf(action.toUpperCase(Locale.ROOT));
+ if (reloadAction == ConfigReloadAction.RESTART) {
+ scheduleReload(connectorName, result.ttls());
+ }
}
return result.data();
}
@@ -58,21 +74,19 @@ public class WorkerConfigTransformer {
}
private void scheduleReload(String connectorName, String path, long ttl) {
- Herder herder = worker.herder();
- if (herder.connectorConfigReloadAction(connectorName) == Herder.ConfigReloadAction.RESTART) {
- Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
- if (connectorRequests == null) {
- connectorRequests = new ConcurrentHashMap<>();
- requests.put(connectorName, connectorRequests);
- } else {
- HerderRequest previousRequest = connectorRequests.get(path);
- if (previousRequest != null) {
- // Delete previous request for ttl which is now stale
- previousRequest.cancel();
- }
+ Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
+ if (connectorRequests == null) {
+ connectorRequests = new ConcurrentHashMap<>();
+ requests.put(connectorName, connectorRequests);
+ } else {
+ HerderRequest previousRequest = connectorRequests.get(path);
+ if (previousRequest != null) {
+ // Delete previous request for ttl which is now stale
+ previousRequest.cancel();
}
- HerderRequest request = herder.restartConnector(ttl, connectorName, null);
- connectorRequests.put(path, request);
}
+ log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
+ HerderRequest request = worker.herder().restartConnector(ttl, connectorName, null);
+ connectorRequests.put(path, request);
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index f2009db..dc91f35 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -61,7 +61,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
@@ -643,13 +642,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public ConfigReloadAction connectorConfigReloadAction(final String connName) {
- return ConfigReloadAction.valueOf(
- configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
- .toUpperCase(Locale.ROOT));
- }
-
- @Override
public void restartConnector(final String connName, final Callback<Void> callback) {
restartConnector(0, connName, callback);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40ad980..fe31c28 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@@ -261,13 +260,6 @@ public class StandaloneHerder extends AbstractHerder {
}
@Override
- public ConfigReloadAction connectorConfigReloadAction(final String connName) {
- return ConfigReloadAction.valueOf(
- configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
- .toUpperCase(Locale.ROOT));
- }
-
- @Override
public synchronized void restartConnector(String connName, Callback<Void> cb) {
if (!configState.contains(connName))
cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index 300022d..034bd51 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -28,9 +28,12 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -69,18 +72,18 @@ public class WorkerConfigTransformerTest {
@Test
public void testReplaceVariableWithTTL() throws Exception {
EasyMock.expect(worker.herder()).andReturn(herder);
- EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
replayAll();
- Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
- assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+ Map<String, String> props = new HashMap<>();
+ props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
+ props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
+ Map<String, String> result = configTransformer.transform(MY_CONNECTOR, props);
}
@Test
public void testReplaceVariableWithTTLAndScheduleRestart() throws Exception {
EasyMock.expect(worker.herder()).andReturn(herder);
- EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
replayAll();
@@ -92,11 +95,9 @@ public class WorkerConfigTransformerTest {
@Test
public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() throws Exception {
EasyMock.expect(worker.herder()).andReturn(herder);
- EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
EasyMock.expect(worker.herder()).andReturn(herder);
- EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expectLastCall();
requestId.cancel();
EasyMock.expectLastCall();