You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/29 23:32:03 UTC
kafka git commit: KAFKA-4397: Refactor Connect backing stores for
thread safety
Repository: kafka
Updated Branches:
refs/heads/trunk f37dab76f -> d98ca230a
KAFKA-4397: Refactor Connect backing stores for thread safety
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Shikhar Bhushan <sh...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2123 from kkonstantine/KAFKA-4397-Refactor-connect-backing-stores-for-thread-safety
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d98ca230
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d98ca230
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d98ca230
Branch: refs/heads/trunk
Commit: d98ca230a14b0aedae752fa97f5d55b3a0c49b9c
Parents: f37dab7
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Tue Nov 29 15:30:40 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 29 15:31:14 2016 -0800
----------------------------------------------------------------------
.../kafka/connect/cli/ConnectDistributed.java | 3 +-
.../connect/storage/ConfigBackingStore.java | 3 -
.../storage/KafkaConfigBackingStore.java | 75 ++++++++++----------
.../storage/KafkaStatusBackingStore.java | 4 +-
.../storage/MemoryConfigBackingStore.java | 5 --
.../storage/KafkaConfigBackingStoreTest.java | 30 ++++----
6 files changed, 58 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 7a09ac3..c3a61b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -78,8 +78,7 @@ public class ConnectDistributed {
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);
- ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
- configBackingStore.configure(config);
+ ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
advertisedUrl.toString());
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index 77fc43b..6ab5a1b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -29,8 +28,6 @@ import java.util.concurrent.TimeoutException;
public interface ConfigBackingStore {
- void configure(WorkerConfig config);
-
void start();
void stop();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index af8efee..1a46693 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -192,23 +192,25 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private static final long READ_TO_END_TIMEOUT_MS = 30000;
private final Object lock;
- private boolean starting;
private final Converter converter;
+ private volatile boolean started;
+ // Although updateListener is not final, it's guaranteed to be visible to any thread after its
+ // initialization as long as we always read the volatile variable "started" before we access the listener.
private UpdateListener updateListener;
- private String topic;
+ private final String topic;
// Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect
// format to serialized form
- private KafkaBasedLog<String, byte[]> configLog;
+ private final KafkaBasedLog<String, byte[]> configLog;
// Connector -> # of tasks
- private Map<String, Integer> connectorTaskCounts = new HashMap<>();
+ private final Map<String, Integer> connectorTaskCounts = new HashMap<>();
// Connector and task configs: name or id -> config map
- private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
- private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+ private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+ private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
- private Set<String> inconsistent = new HashSet<>();
+ private final Set<String> inconsistent = new HashSet<>();
// The most recently read offset. This does not take into account deferred task updates/commits, so we may have
// outstanding data to be applied.
private volatile long offset;
@@ -218,11 +220,17 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final Map<String, TargetState> connectorTargetStates = new HashMap<>();
- public KafkaConfigBackingStore(Converter converter) {
+ public KafkaConfigBackingStore(Converter converter, WorkerConfig config) {
this.lock = new Object();
- this.starting = false;
+ this.started = false;
this.converter = converter;
this.offset = -1;
+
+ this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
+ if (this.topic.equals(""))
+ throw new ConfigException("Must specify topic for connector configuration.");
+
+ configLog = setupAndCreateKafkaBasedLog(this.topic, config);
}
@Override
@@ -231,33 +239,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
@Override
- public void configure(WorkerConfig config) {
- topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
- if (topic.equals(""))
- throw new ConfigException("Must specify topic for connector configuration.");
-
- Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(config.originals());
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-
- Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(config.originals());
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-
- configLog = createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
- }
-
- @Override
public void start() {
log.info("Starting KafkaConfigBackingStore");
- // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
+ // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
- starting = true;
configLog.start();
- starting = false;
+ started = true;
log.info("Started KafkaConfigBackingStore");
}
@@ -295,7 +282,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
}
/**
- * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
+ * Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by
* tailing the Kafka log with a consumer.
*
* @param connector name of the connector to write data for
@@ -416,6 +403,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
}
+ // package private for testing
+ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerConfig config) {
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.putAll(config.originals());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(config.originals());
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+ return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
+ }
+
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
@@ -480,7 +483,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// Note that we do not notify the update listener if the target state has been removed.
// Instead we depend on the removal callback of the connector config itself to notify the worker.
- if (!starting && !removed)
+ if (started && !removed)
updateListener.onConnectorTargetStateChange(connectorName);
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
@@ -513,7 +516,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectorTargetStates.put(connectorName, TargetState.STARTED);
}
}
- if (!starting) {
+ if (started) {
if (removed)
updateListener.onConnectorConfigRemove(connectorName);
else
@@ -604,7 +607,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
connectorTaskCounts.put(connectorName, newTaskCount);
}
- if (!starting)
+ if (started)
updateListener.onTaskConfigUpdate(updatedTasks);
} else {
log.error("Discarding config update record with invalid key: " + record.key());
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index c377ff6..38a239e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -378,7 +378,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
if (status == null)
return;
- synchronized (KafkaStatusBackingStore.this) {
+ synchronized (this) {
log.trace("Received connector {} status update {}", connector, status);
CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
entry.put(status);
@@ -404,7 +404,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
return;
}
- synchronized (KafkaStatusBackingStore.this) {
+ synchronized (this) {
log.trace("Received task {} status update {}", id, status);
CacheEntry<TaskStatus> entry = getOrAdd(id);
entry.put(status);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 212022d..781b5bf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -35,10 +34,6 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
private UpdateListener updateListener;
@Override
- public void configure(WorkerConfig config) {
- }
-
- @Override
public synchronized void start() {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index f5bce8f..4a101c3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -133,6 +133,7 @@ public class KafkaConfigBackingStoreTest {
KafkaBasedLog<String, byte[]> storeLog;
private KafkaConfigBackingStore configStorage;
+ private String internalTopic;
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
@@ -142,7 +143,8 @@ public class KafkaConfigBackingStoreTest {
@Before
public void setUp() {
- configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter);
+ configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG);
+ Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
}
@@ -154,7 +156,8 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+
assertEquals(TOPIC, capturedTopic.getValue());
assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -193,7 +196,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Null before writing
@@ -261,8 +264,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
-
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -317,7 +319,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -370,7 +372,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector with initial state paused
@@ -412,7 +414,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector with initial state paused
@@ -462,7 +464,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector with initial state paused
@@ -501,7 +503,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// The target state deletion should reset the state to STARTED
@@ -548,7 +550,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
@@ -602,7 +604,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
@@ -649,7 +651,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
@@ -712,7 +714,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// After reading the log, it should have been in an inconsistent state
ClusterConfigState configState = configStorage.snapshot();