You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/07 20:02:40 UTC

[kafka] branch 2.3 updated: KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f924fb3  KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
f924fb3 is described below

commit f924fb3125dc1b7ff354b83dbdda2156269bc292
Author: Evelyn Bayes <30...@users.noreply.github.com>
AuthorDate: Mon Jun 8 05:42:00 2020 +1000

    KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
    
    Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
    This is fine unless someone has manually created that topic already with the wrong partition count.
    
    In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.
    
    This commits adds a check when the KafkaConfigBackingStore is starting.
    This check will throw a ConfigException if there is more than one partition in the backing store.
    
    This exception is then caught upstream and logged by either:
    - DistributedHerder#run
    - ConnectStandalone#main
    
    A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.
    
    Author: Evelyn Bayes <ev...@confluent.io>
    Co-authored-by: Randall Hauch <rh...@gmail.com>
    
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 10 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 +++
 .../storage/KafkaConfigBackingStoreTest.java       | 36 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b6b4ebf..a51a064 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -249,6 +249,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
         // updates can continue to occur in the background
         configLog.start();
+
+        int partitionCount = configLog.partitionCount();
+        if (partitionCount > 1) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                    + "to have a single partition in order to guarantee consistency of "
+                    + "connector configurations, but found %d partitions.",
+                    topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
+            throw new ConfigException(msg);
+        }
+
         started = true;
         log.info("Started KafkaConfigBackingStore");
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e301581..69d2588 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
 
     private Time time;
     private final String topic;
+    private int partitionCount;
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
@@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
 
         for (PartitionInfo partition : partitionInfos)
             partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        partitionCount = partitions.size();
         consumer.assign(partitions);
 
         // Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
@@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
         producer.send(new ProducerRecord<>(topic, key, value), callback);
     }
 
+    public int partitionCount() {
+        return partitionCount;
+    }
 
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 0a82a09..81ebdac 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -156,6 +158,7 @@ public class KafkaConfigBackingStoreTest {
     public void testStartStop() throws Exception {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -199,6 +202,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -267,6 +271,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -351,6 +356,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -412,6 +418,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -465,6 +472,7 @@ public class KafkaConfigBackingStoreTest {
 
         // Shouldn't see any callbacks since this is during startup
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -507,6 +515,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -557,6 +566,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -602,6 +612,7 @@ public class KafkaConfigBackingStoreTest {
         logOffset = 5;
 
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -649,6 +660,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 7;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -703,6 +715,7 @@ public class KafkaConfigBackingStoreTest {
 
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -750,6 +763,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
         logOffset = 8;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -797,6 +811,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Successful attempt to write new task config
         expectReadToEnd(new LinkedHashMap<String, byte[]>());
@@ -851,6 +866,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        expectPartitionCount(2);
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
+        assertTrue(e.getMessage().contains("required to have a single partition"));
+
+        PowerMock.verifyAll();
+    }
+
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@@ -859,6 +890,11 @@ public class KafkaConfigBackingStoreTest {
                 .andReturn(storeLog);
     }
 
+    private void expectPartitionCount(int partitionCount) {
+        EasyMock.expect(storeLog.partitionCount())
+                .andReturn(partitionCount);
+    }
+
     // If non-empty, deserializations should be a LinkedHashMap
     private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
                              final Map<byte[], Struct> deserializations) {