You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/29 23:32:03 UTC

kafka git commit: KAFKA-4397: Refactor Connect backing stores for thread safety

Repository: kafka
Updated Branches:
  refs/heads/trunk f37dab76f -> d98ca230a


KAFKA-4397: Refactor Connect backing stores for thread safety

Author: Konstantine Karantasis <ko...@confluent.io>

Reviewers: Shikhar Bhushan <sh...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2123 from kkonstantine/KAFKA-4397-Refactor-connect-backing-stores-for-thread-safety


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d98ca230
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d98ca230
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d98ca230

Branch: refs/heads/trunk
Commit: d98ca230a14b0aedae752fa97f5d55b3a0c49b9c
Parents: f37dab7
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Tue Nov 29 15:30:40 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 29 15:31:14 2016 -0800

----------------------------------------------------------------------
 .../kafka/connect/cli/ConnectDistributed.java   |  3 +-
 .../connect/storage/ConfigBackingStore.java     |  3 -
 .../storage/KafkaConfigBackingStore.java        | 75 ++++++++++----------
 .../storage/KafkaStatusBackingStore.java        |  4 +-
 .../storage/MemoryConfigBackingStore.java       |  5 --
 .../storage/KafkaConfigBackingStoreTest.java    | 30 ++++----
 6 files changed, 58 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 7a09ac3..c3a61b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -78,8 +78,7 @@ public class ConnectDistributed {
         StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
         statusBackingStore.configure(config);
 
-        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
-        configBackingStore.configure(config);
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
 
         DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
                 advertisedUrl.toString());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index 77fc43b..6ab5a1b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -29,8 +28,6 @@ import java.util.concurrent.TimeoutException;
 
 public interface ConfigBackingStore {
 
-    void configure(WorkerConfig config);
-
     void start();
 
     void stop();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index af8efee..1a46693 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -192,23 +192,25 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     private static final long READ_TO_END_TIMEOUT_MS = 30000;
 
     private final Object lock;
-    private boolean starting;
     private final Converter converter;
+    private volatile boolean started;
+    // Although updateListener is not final, it's guaranteed to be visible to any thread after its
+    // initialization as long as we always read the volatile variable "started" before we access the listener.
     private UpdateListener updateListener;
 
-    private String topic;
+    private final String topic;
     // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect
     // format to serialized form
-    private KafkaBasedLog<String, byte[]> configLog;
+    private final KafkaBasedLog<String, byte[]> configLog;
     // Connector -> # of tasks
-    private Map<String, Integer> connectorTaskCounts = new HashMap<>();
+    private final Map<String, Integer> connectorTaskCounts = new HashMap<>();
     // Connector and task configs: name or id -> config map
-    private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
-    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+    private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+    private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
 
     // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they have been refreshed.
-    private Set<String> inconsistent = new HashSet<>();
+    private final Set<String> inconsistent = new HashSet<>();
     // The most recently read offset. This does not take into account deferred task updates/commits, so we may have
     // outstanding data to be applied.
     private volatile long offset;
@@ -218,11 +220,17 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     private final Map<String, TargetState> connectorTargetStates = new HashMap<>();
 
-    public KafkaConfigBackingStore(Converter converter) {
+    public KafkaConfigBackingStore(Converter converter, WorkerConfig config) {
         this.lock = new Object();
-        this.starting = false;
+        this.started = false;
         this.converter = converter;
         this.offset = -1;
+
+        this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
+        if (this.topic.equals(""))
+            throw new ConfigException("Must specify topic for connector configuration.");
+
+        configLog = setupAndCreateKafkaBasedLog(this.topic, config);
     }
 
     @Override
@@ -231,33 +239,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     }
 
     @Override
-    public void configure(WorkerConfig config) {
-        topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
-        if (topic.equals(""))
-            throw new ConfigException("Must specify topic for connector configuration.");
-
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(config.originals());
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-
-        Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(config.originals());
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-
-        configLog = createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
-    }
-
-    @Override
     public void start() {
         log.info("Starting KafkaConfigBackingStore");
-        // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
+        // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
         // updates can continue to occur in the background
-        starting = true;
         configLog.start();
-        starting = false;
+        started = true;
         log.info("Started KafkaConfigBackingStore");
     }
 
@@ -295,7 +282,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     }
 
     /**
-     * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
+     * Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by
      * tailing the Kafka log with a consumer.
      *
      * @param connector  name of the connector to write data for
@@ -416,6 +403,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
     }
 
+    // package private for testing
+    KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerConfig config) {
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(config.originals());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(config.originals());
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
+    }
+
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
@@ -480,7 +483,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
                 // Note that we do not notify the update listener if the target state has been removed.
                 // Instead we depend on the removal callback of the connector config itself to notify the worker.
-                if (!starting && !removed)
+                if (started && !removed)
                     updateListener.onConnectorTargetStateChange(connectorName);
 
             } else if (record.key().startsWith(CONNECTOR_PREFIX)) {
@@ -513,7 +516,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                             connectorTargetStates.put(connectorName, TargetState.STARTED);
                     }
                 }
-                if (!starting) {
+                if (started) {
                     if (removed)
                         updateListener.onConnectorConfigRemove(connectorName);
                     else
@@ -604,7 +607,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                     connectorTaskCounts.put(connectorName, newTaskCount);
                 }
 
-                if (!starting)
+                if (started)
                     updateListener.onTaskConfigUpdate(updatedTasks);
             } else {
                 log.error("Discarding config update record with invalid key: " + record.key());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index c377ff6..38a239e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -378,7 +378,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         if (status == null)
             return;
 
-        synchronized (KafkaStatusBackingStore.this) {
+        synchronized (this) {
             log.trace("Received connector {} status update {}", connector, status);
             CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
             entry.put(status);
@@ -404,7 +404,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
             return;
         }
 
-        synchronized (KafkaStatusBackingStore.this) {
+        synchronized (this) {
             log.trace("Received task {} status update {}", id, status);
             CacheEntry<TaskStatus> entry = getOrAdd(id);
             entry.put(status);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 212022d..781b5bf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -35,10 +34,6 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
     private UpdateListener updateListener;
 
     @Override
-    public void configure(WorkerConfig config) {
-    }
-
-    @Override
     public synchronized void start() {
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index f5bce8f..4a101c3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -133,6 +133,7 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     private KafkaConfigBackingStore configStorage;
 
+    private String internalTopic;
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
@@ -142,7 +143,8 @@ public class KafkaConfigBackingStoreTest {
 
     @Before
     public void setUp() {
-        configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter);
+        configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG);
+        Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
     }
 
@@ -154,7 +156,8 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+
         assertEquals(TOPIC, capturedTopic.getValue());
         assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
         assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -193,7 +196,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Null before writing
@@ -261,8 +264,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -317,7 +319,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -370,7 +372,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector with initial state paused
@@ -412,7 +414,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector with initial state paused
@@ -462,7 +464,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector with initial state paused
@@ -501,7 +503,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // The target state deletion should reset the state to STARTED
@@ -548,7 +550,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector and its config should be the last one seen anywhere in the log
@@ -602,7 +604,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector and its config should be the last one seen anywhere in the log
@@ -649,7 +651,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector and its config should be the last one seen anywhere in the log
@@ -712,7 +714,7 @@ public class KafkaConfigBackingStoreTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
         // After reading the log, it should have been in an inconsistent state
         ClusterConfigState configState = configStorage.snapshot();