You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/13 16:02:00 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1104664513


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");

Review Comment:
   If the `KafkaBasedLog` constructor for `store` fails, won't `store` be null, causing `consumer` to be leaked?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow
         }
     }
 
     public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+        Utils.closeQuietly(backingStore::stop, "offset sync store kafka based log");

Review Comment:
   Possible NPE:
   ```suggestion
           Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "offset sync store kafka based log");
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");

Review Comment:
   Also, could we use more descriptive names for these objects? For example, this one could be "backing store for offset syncs".



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());

Review Comment:
   Nit: we have a utility method for this
   ```suggestion
           Consumer<byte[], byte[]> consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);

Review Comment:
   This leads to a change in behavior since we'll end up consuming from all partitions in the offset syncs topic instead of just partition 0.
   
   We intentionally [write every offset sync to partition zero](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L249) and [create the topic with a single partition](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L361), but the topic may have been created out-of-band and there may be other information in it which has not been produced by MM2 that we shouldn't consume.
   
   Could we expand the `KafkaBasedLog` API to support reading from a specific subset of the partitions for a topic, possibly by adding a `protected List<TopicPartitions> assignedPartitions(List<PartitionInfo> partitionInfos)` method that can be overridden by subclasses. This would allow us to completely preserve the existing behavior.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow

Review Comment:
   Won't this cause us to emit stale offset syncs in `MirrorCheckpointTask::poll` if we encounter some types of failure here?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow

Review Comment:
   I haven't looked too closely into this, but one possibility could be to return `Optional.empty()` from `OffsetSyncStore::translateDownstream` if we haven't completed a read to the end of the offset syncs topic yet?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");
+            Utils.closeQuietly(new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())), "admin client");
+            throw t;
+        }
+        this.backingStore = store;
+    }
+
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    public void readToEnd(Runnable callback) {
+        backingStore.readToEnd((error, result) -> callback.run());
     }
 
-    OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+    synchronized OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {

Review Comment:
   Why are we synchronizing here? Would it be enough to change `offsetSyncs` to be a `ConcurrentMap`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();

Review Comment:
   This will trigger a [synchronous read to the end of the topic](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L269).
   
   Perhaps we could introduce a separate method to "start" the store that invokes `start` on the underlying `KafkaBasedLog`? This method could be run on the `MirrorCheckpointTask`'s `scheduler` so as not to block startup.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");
+            Utils.closeQuietly(new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())), "admin client");

Review Comment:
   Should this be:
   ```suggestion
               Utils.closeQuietly(admin, "admin client");
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org