You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/07 20:03:10 UTC
[kafka] branch 2.5 updated: KAFKA-9216: Enforce internal config
topic settings for Connect workers during startup (#8270)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new e90e8ca KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
e90e8ca is described below
commit e90e8ca72429dc368d29fd877ec27dddd4d32e82
Author: Evelyn Bayes <30...@users.noreply.github.com>
AuthorDate: Mon Jun 8 05:42:00 2020 +1000
KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.
In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.
This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.
This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main
A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.
Author: Evelyn Bayes <ev...@confluent.io>
Co-authored-by: Randall Hauch <rh...@gmail.com>
Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
.../connect/storage/KafkaConfigBackingStore.java | 10 ++++++
.../apache/kafka/connect/util/KafkaBasedLog.java | 5 +++
.../storage/KafkaConfigBackingStoreTest.java | 36 ++++++++++++++++++++++
3 files changed, 51 insertions(+)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 658d6c3..4c6429c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -263,6 +263,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
configLog.start();
+
+ int partitionCount = configLog.partitionCount();
+ if (partitionCount > 1) {
+ String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+ + "to have a single partition in order to guarantee consistency of "
+ + "connector configurations, but found %d partitions.",
+ topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
+ throw new ConfigException(msg);
+ }
+
started = true;
log.info("Started KafkaConfigBackingStore");
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e301581..69d2588 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
private Time time;
private final String topic;
+ private int partitionCount;
private final Map<String, Object> producerConfigs;
private final Map<String, Object> consumerConfigs;
private final Callback<ConsumerRecord<K, V>> consumedCallback;
@@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+ partitionCount = partitions.size();
consumer.assign(partitions);
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
@@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
producer.send(new ProducerRecord<>(topic, key, value), callback);
}
+ public int partitionCount() {
+ return partitionCount;
+ }
private Producer<K, V> createProducer() {
// Always require producer acks to all to ensure durable writes
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 512dcb3..9a55690 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@@ -156,6 +158,7 @@ public class KafkaConfigBackingStoreTest {
public void testStartStop() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -199,6 +202,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -267,6 +271,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -351,6 +356,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
expectReadToEnd(serializedConfigs);
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -412,6 +418,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
expectReadToEnd(serializedConfigs);
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -465,6 +472,7 @@ public class KafkaConfigBackingStoreTest {
// Shouldn't see any callbacks since this is during startup
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -507,6 +515,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -557,6 +566,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
+ expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@@ -602,6 +612,7 @@ public class KafkaConfigBackingStoreTest {
logOffset = 5;
expectStart(existingRecords, deserialized);
+ expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@@ -649,6 +660,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
logOffset = 7;
expectStart(existingRecords, deserialized);
+ expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@@ -703,6 +715,7 @@ public class KafkaConfigBackingStoreTest {
logOffset = 6;
expectStart(existingRecords, deserialized);
+ expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@@ -750,6 +763,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
logOffset = 8;
expectStart(existingRecords, deserialized);
+ expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@@ -797,6 +811,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
logOffset = 6;
expectStart(existingRecords, deserialized);
+ expectPartitionCount(1);
// Successful attempt to write new task config
expectReadToEnd(new LinkedHashMap<>());
@@ -851,6 +866,22 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
+ expectConfigure();
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+
+ expectPartitionCount(2);
+
+ PowerMock.replayAll();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+ ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
+ assertTrue(e.getMessage().contains("required to have a single partition"));
+
+ PowerMock.verifyAll();
+ }
+
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@@ -859,6 +890,11 @@ public class KafkaConfigBackingStoreTest {
.andReturn(storeLog);
}
+ private void expectPartitionCount(int partitionCount) {
+ EasyMock.expect(storeLog.partitionCount())
+ .andReturn(partitionCount);
+ }
+
// If non-empty, deserializations should be a LinkedHashMap
private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
final Map<byte[], Struct> deserializations) {