You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/04 11:49:14 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

mimaison commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r435134966



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.util.HashMap;

Review comment:
       can we move these imports with the other `java.util` imports?

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get();
+                ConsumerGroupState consumerGroupState = consumerGroupDesc.state();
+                // sync offset to the target cluster only if the state of current consumer group is:
+                // (1) idle: because the consumer at target is not actively consuming the mirrored topic
+                // (2) dead: the new consumer that is recently created at source and never exist at target
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                    idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
+                            Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+                }
+                // new consumer upstream has state "DEAD" and will be identified during the offset sync-up
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints (converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = group.getValue();
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>();
+            Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset = idleConsumerGroupsOffset.get(consumerGroupId);
+            if (targetConsumerOffset == null) {
+                // this is a new consumer, just sync the offset to target
+                syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
+                offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
+                continue;
+            }
+
+            for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) {
+
+                TopicPartition topicPartition = convertedEntry.getKey();
+                OffsetAndMetadata convertedOffset = convertedUpstreamOffset.get(topicPartition);
+                if (!targetConsumerOffset.containsKey(topicPartition)) {
+                    // if is a new topicPartition from upstream, just sync the offset to target
+                    offsetToSync.put(topicPartition, convertedOffset);
+                    continue;
+                }
+
+                // if translated offset from upstream is smaller than the current consumer offset
+                // in the target, skip updating the offset for that partition
+                long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
+                if (latestDownstreamOffset >= convertedOffset.offset()) {
+                    log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for "

Review comment:
       `larger or equal`?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,8 +185,30 @@ public void close() {
         backup.stop();
     }
 
+
+
     @Test
     public void testReplication() throws InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer3 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(

Review comment:
       It's a bit unusual to have `consumer3` and `consumer4` without 1 and 2 =)

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -79,7 +93,15 @@ public void start(Map<String, String> props) {
         pollTimeout = config.consumerPollTimeout();
         offsetSyncStore = new OffsetSyncStore(config);
         sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        targetAdminClient = (KafkaAdminClient) Admin.create(config.targetAdminConfig());

Review comment:
       can we change the type definition of these 2 to be `Admin`? Then we don't need the cast

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));
+
+        c1t1.put(t1p0, new OffsetAndMetadata(100, ""));
+
+        Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
+        TopicPartition t2p0 = new TopicPartition(topic2, 0);
+        Entry<TopicPartition, OffsetAndMetadata> c2t2p0 = new SimpleEntry<>(t2p0, new OffsetAndMetadata(50, ""));

Review comment:
       We can use `new OffsetAndMetadata(50)` if we don't set any metadata. same below

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+
+    @Test
+    public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // enable automated consumer group offset sync
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        // one way replication from primary to backup
+        mm2Props.put("backup->primary.enabled", "false");
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+
+        // sleep 5 seconds to ensure the automated group offset sync is complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to consume 1 topic
+        Consumer<byte[], byte[]> consumer = backup.kafka().createConsumerAndSubscribeTo(
+            Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1");
+        ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+        // the size of consumer record should be zero, because the offsets of the same consumer group
+        // have been automatically synchronized from primary to backup by the background job, so no
+        // more records to consume from the replicated topic by the same consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);
+
+        // now create a new topic in primary cluster
+        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+        backup.kafka().createTopic("primary.test-topic-2", 1);
+        // produce some records to the new topic in primary cluster
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", "message-1-" + i);
+        }
+
+        // create a consumer at primary cluster to consume the new topic
+        consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-2");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // sleep 5 seconds to ensure the automated group offset sync is complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+        consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
+
+        records = consumer.poll(Duration.ofMillis(500));
+        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);

Review comment:
       what about `assertEquals("consumer record size is not zero", 0, records.count());`? It can also be applied in a few other places

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -29,7 +35,7 @@
     @Test
     public void testDownstreamTopicRenaming() {
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
-            new DefaultReplicationPolicy(), null);
+            new DefaultReplicationPolicy(), null, new HashMap<>(), new HashMap<>());

Review comment:
       We could use `Collections.emptyMap()` here and in a few places below

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();

Review comment:
       It looks like we describe all groups just to get groups in the `EMPTY` state. Can we use the new `listGroups()`  method introduced in [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) to only get groups in that specific state?

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -234,6 +243,17 @@ Duration adminTimeout() {
         return props;
     }
 
+
+    Map<String, Object> targetConsumerConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
+        props.put("enable.auto.commit", "false");

Review comment:
       Can we use the existing constants for the config names?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));

Review comment:
       This looks unused, same below for `c2t2p0`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org