You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 05:58:46 UTC

[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r435705249



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+
+    @Test
+    public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // enable automated consumer group offset sync
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        // one way replication from primary to backup
+        mm2Props.put("backup->primary.enabled", "false");
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+
+        // sleep 5 seconds to ensure the automated group offset sync is complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to consume 1 topic
+        Consumer<byte[], byte[]> consumer = backup.kafka().createConsumerAndSubscribeTo(
+            Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1");
+        ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+        // the size of consumer record should be zero, because the offsets of the same consumer group
+        // have been automatically synchronized from primary to backup by the background job, so no
+        // more records to consume from the replicated topic by the same consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);
+
+        // now create a new topic in primary cluster
+        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+        backup.kafka().createTopic("primary.test-topic-2", 1);
+        // produce some records to the new topic in primary cluster
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", "message-1-" + i);
+        }
+
+        // create a consumer at primary cluster to consume the new topic
+        consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-2");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // sleep 5 seconds to ensure the automated group offset sync is complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+        consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2");
+
+        records = consumer.poll(Duration.ofMillis(500));
+        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);

Review comment:
       updated

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,8 +185,30 @@ public void close() {
         backup.stop();
     }
 
+
+
     @Test
     public void testReplication() throws InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer3 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(

Review comment:
       updated to `consumer1` and `consumer2`

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));
+
+        c1t1.put(t1p0, new OffsetAndMetadata(100, ""));
+
+        Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
+        TopicPartition t2p0 = new TopicPartition(topic2, 0);
+        Entry<TopicPartition, OffsetAndMetadata> c2t2p0 = new SimpleEntry<>(t2p0, new OffsetAndMetadata(50, ""));

Review comment:
       updated

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));

Review comment:
       removed the unused

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -29,7 +35,7 @@
     @Test
     public void testDownstreamTopicRenaming() {
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
-            new DefaultReplicationPolicy(), null);
+            new DefaultReplicationPolicy(), null, new HashMap<>(), new HashMap<>());

Review comment:
       updated

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -234,6 +243,17 @@ Duration adminTimeout() {
         return props;
     }
 
+
+    Map<String, Object> targetConsumerConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
+        props.put("enable.auto.commit", "false");

Review comment:
       done

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get();
+                ConsumerGroupState consumerGroupState = consumerGroupDesc.state();
+                // sync offset to the target cluster only if the state of current consumer group is:
+                // (1) idle: because the consumer at target is not actively consuming the mirrored topic
+                // (2) dead: the new consumer that is recently created at source and never exist at target
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                    idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
+                            Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+                }
+                // new consumer upstream has state "DEAD" and will be identified during the offset sync-up
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints (converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = group.getValue();
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>();
+            Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset = idleConsumerGroupsOffset.get(consumerGroupId);
+            if (targetConsumerOffset == null) {
+                // this is a new consumer, just sync the offset to target
+                syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
+                offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
+                continue;
+            }
+
+            for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) {
+
+                TopicPartition topicPartition = convertedEntry.getKey();
+                OffsetAndMetadata convertedOffset = convertedUpstreamOffset.get(topicPartition);
+                if (!targetConsumerOffset.containsKey(topicPartition)) {
+                    // if is a new topicPartition from upstream, just sync the offset to target
+                    offsetToSync.put(topicPartition, convertedOffset);
+                    continue;
+                }
+
+                // if translated offset from upstream is smaller than the current consumer offset
+                // in the target, skip updating the offset for that partition
+                long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
+                if (latestDownstreamOffset >= convertedOffset.offset()) {
+                    log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for "

Review comment:
       updated to `latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for....`

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();

Review comment:
       great to know that KIP, then I will keep using `describeConsumerGroups()` here

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -79,7 +93,15 @@ public void start(Map<String, String> props) {
         pollTimeout = config.consumerPollTimeout();
         offsetSyncStore = new OffsetSyncStore(config);
         sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        targetAdminClient = (KafkaAdminClient) Admin.create(config.targetAdminConfig());

Review comment:
       updated

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.util.HashMap;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org