You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/14 13:40:53 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

showuon commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r896805607


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,8 +67,62 @@
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-    private KafkaBasedLog<byte[], byte[]> offsetLog;
-    private HashMap<ByteBuffer, ByteBuffer> data;
+    /**
+     * Build a connector-specific offset store with read and write support.
+     * @param topic the name of the offsets topic to use
+     * @param producer the producer to use for writing to the offsets topic
+     * @param consumer the consumer to use for reading from the offsets topic
+     * @param topicAdmin the topic admin to use for creating and querying metadata for the offsets topic
+     * @return an offset store backed by the given topic and Kafka clients
+     */
+    public static KafkaOffsetBackingStore forTask(String topic,
+                                                       Producer<byte[], byte[]> producer,
+                                                       Consumer<byte[], byte[]> consumer,
+                                                       TopicAdmin topicAdmin) {
+        return new KafkaOffsetBackingStore(() -> topicAdmin) {
+            @Override
+            public void configure(final WorkerConfig config) {
+                offsetLog = KafkaBasedLog.withExistingClients(
+                        topic,
+                        consumer,
+                        producer,
+                        topicAdmin,
+                        consumedCallback,
+                        Time.SYSTEM,
+                        initialize(topic, topicDescription(topic, config))
+                );
+            }
+        };
+    }
+
+    /**
+     * Build a connector-specific offset store with read-only support.
+     * @param topic the name of the offsets topic to use
+     * @param consumer the consumer to use for reading from the offsets topic
+     * @param topicAdmin the topic admin to use for creating and querying metadata for the offsets topic
+     * @return a read-only offset store backed by the given topic and Kafka clients
+     */
+    public static KafkaOffsetBackingStore forConnector(String topic,
+                                                  Consumer<byte[], byte[]> consumer,
+                                                  TopicAdmin topicAdmin) {

Review Comment:
   nit: indent



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -381,6 +435,10 @@ Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, boolean
             } catch (UnsupportedVersionException e) {
                 // This may happen with really old brokers that don't support the auto topic creation
                 // field in metadata requests
+                if (requireAdminForOffsets) {
+                    // Should be handled by the caller during log startup
+                    throw e;

Review Comment:
   Should we set `admin = null` in this case, like the comment said:
   `// Forget the reference to the admin so that we won't even try to use the admin the next time this method is called` ?
                  



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,8 +67,62 @@
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-    private KafkaBasedLog<byte[], byte[]> offsetLog;
-    private HashMap<ByteBuffer, ByteBuffer> data;
+    /**
+     * Build a connector-specific offset store with read and write support.
+     * @param topic the name of the offsets topic to use
+     * @param producer the producer to use for writing to the offsets topic
+     * @param consumer the consumer to use for reading from the offsets topic
+     * @param topicAdmin the topic admin to use for creating and querying metadata for the offsets topic
+     * @return an offset store backed by the given topic and Kafka clients
+     */
+    public static KafkaOffsetBackingStore forTask(String topic,
+                                                       Producer<byte[], byte[]> producer,
+                                                       Consumer<byte[], byte[]> consumer,
+                                                       TopicAdmin topicAdmin) {

Review Comment:
   nit: indent



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1279,6 +1288,455 @@ public void testAdminConfigsClientOverridesWithNonePolicy() {
         verify(connectorConfig).originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX);
     }
 
+    @Test
+    public void testRegularSourceOffsetsConsumerConfigs() {
+        final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+        when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", "connect-cluster");
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021");
+        workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is allowed to override the isolation level for regular (non-exactly-once) source connectors and their tasks
+        assertEquals("read_uncommitted", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+        connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489");
+        connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.regularSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:489", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is allowed to override the isolation level for regular (non-exactly-once) source connectors and their tasks
+        assertEquals("read_uncommitted", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+    }
+
+    @Test
+    public void testExactlyOnceSourceOffsetsConsumerConfigs() {
+        final Map<String, Object> connectorConsumerOverrides = new HashMap<>();
+        when(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).thenReturn(connectorConsumerOverrides);
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", "connect-cluster");
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.put("consumer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021");
+        workerProps.put("consumer." + ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override isolation level when exactly-once support is enabled
+        assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+
+        workerProps.remove("consumer." + ISOLATION_LEVEL_CONFIG);
+        connectorConsumerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489");
+        connectorConsumerOverrides.put(ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+        config = new DistributedConfig(workerProps);
+        consumerConfigs = Worker.exactlyOnceSourceOffsetsConsumerConfigs(
+                "test", "", config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:489", consumerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override isolation level when exactly-once support is enabled
+        assertEquals("read_committed", consumerConfigs.get(ISOLATION_LEVEL_CONFIG));
+    }
+
+    @Test
+    public void testExactlyOnceSourceTaskProducerConfigs() {
+        final Map<String, Object> connectorProducerOverrides = new HashMap<>();
+        when(connectorConfig.originalsWithPrefix(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).thenReturn(connectorProducerOverrides);
+
+        final String groupId = "connect-cluster";
+        final String transactionalId = Worker.taskTransactionalId(groupId, TASK_ID.connector(), TASK_ID.task());
+
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:4761");
+        workerProps.put("group.id", groupId);
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+
+        Map<String, Object> producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(
+                TASK_ID, config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:4761", producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+        assertEquals(transactionalId, producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+        workerProps.put("producer." + BOOTSTRAP_SERVERS_CONFIG, "localhost:9021");
+        workerProps.put("producer." + ENABLE_IDEMPOTENCE_CONFIG, "false");
+        workerProps.put("producer." + TRANSACTIONAL_ID_CONFIG, "some-other-transactional-id");
+        config = new DistributedConfig(workerProps);
+        producerConfigs = Worker.exactlyOnceSourceTaskProducerConfigs(
+                TASK_ID, config, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID);
+        assertEquals("localhost:9021", producerConfigs.get(BOOTSTRAP_SERVERS_CONFIG));
+        // User is not allowed to override idempotence or transactional ID for exactly-once source tasks
+        assertEquals("true", producerConfigs.get(ENABLE_IDEMPOTENCE_CONFIG));
+        assertEquals(transactionalId, producerConfigs.get(TRANSACTIONAL_ID_CONFIG));
+
+        workerProps.remove("producer." + ENABLE_IDEMPOTENCE_CONFIG);
+        workerProps.remove("producer." + TRANSACTIONAL_ID_CONFIG);
+        connectorProducerOverrides.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:489");
+        connectorProducerOverrides.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+        connectorProducerOverrides.put(TRANSACTIONAL_ID_CONFIG, "yet-another-transactional-id");

Review Comment:
   Could user overrides the `transactional.id` to `null`? Should we add this test case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org