You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/23 05:16:34 UTC

[GitHub] [kafka] ccding commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

ccding commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r656735781



##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
+ * It also provides a way to wait until the given record is received by the consumer before it is timed out with an interval of
+ * {@link TopicBasedRemoteLogMetadataManagerConfig#consumeWaitMs()}.
+ */
+public class ConsumerManager implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
+    private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
+
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+    private final Time time;
+    private final ConsumerTask consumerTask;
+    private final Thread consumerTaskThread;
+
+    public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner,
+                           Time time) {
+        this.rlmmConfig = rlmmConfig;
+        this.time = time;
+
+        //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
+        consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+        consumerTaskThread = KafkaThread.daemon("RLMMConsumerTask", consumerTask);
+    }
+
+    public void startConsumerThread() {
+        try {
+            // Start a thread to continuously consume records from topic partitions.
+            consumerTaskThread.start();
+        } catch (Exception e) {
+            throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
+        }
+    }
+
+    /**
+     * Wait until the consumption reaches the offset of the metadata partition for the given {@code recordMetadata}.
+     *
+     * @param recordMetadata record metadata to be checked for consumption.
+     */
+    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) {
+        final int partition = recordMetadata.partition();
+
+        // If the current assignment does not have the subscription for this partition then return immediately.
+        if (!assignedPartition(partition)) {
+            log.warn("This consumer is not subscribed to the target partition [{}] on which message is produced.",
+                    partition);
+            return;
+        }
+
+        final long offset = recordMetadata.offset();
+        long startTimeMillis = time.milliseconds();
+        while (true) {
+            long committedOffset = committedOffset(partition);
+            if (committedOffset >= offset) {
+                break;
+            }
+
+            log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}],  Sleeping for [{}] to retry again",
+                    offset, partition, committedOffset, CONSUME_RECHECK_INTERVAL_MS);
+
+            if (time.milliseconds() - startTimeMillis > rlmmConfig.consumeWaitMs()) {
+                log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ",
+                        partition, committedOffset, offset);
+            }
+
+            time.sleep(CONSUME_RECHECK_INTERVAL_MS);
+        }
+    }
+
+    private long committedOffset(int partition) {
+        return consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
+    }
+
+    private boolean assignedPartition(int partition) {
+        return consumerTask.assignedPartition(partition);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Consumer task will close the task and it internally closes all the resources including the consumer.
+        Utils.closeQuietly(consumerTask, "ConsumerTask");
+
+        // Wait until the consumer thread finishes.
+        try {
+            consumerTaskThread.join();
+        } catch (Exception e) {
+            log.error("Encountered error while waiting for consumerTaskThread to finish.", e);
+        }
+    }
+
+    public void addAssignmentsForPartitions(HashSet<TopicIdPartition> allPartitions) {
+        consumerTask.addAssignmentsForPartitions(allPartitions);
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {

Review comment:
       why is the variable name `partitions`, while the one above for `addAssignmentsForPartitions` is `allPartitions`?
   
   also why one is `Set` and the other is `HashSet`?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
+ * It also provides a way to wait until the given record is received by the consumer before it is timed out with an interval of
+ * {@link TopicBasedRemoteLogMetadataManagerConfig#consumeWaitMs()}.
+ */
+public class ConsumerManager implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
+    private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
+
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+    private final Time time;
+    private final ConsumerTask consumerTask;
+    private final Thread consumerTaskThread;
+
+    public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner,
+                           Time time) {
+        this.rlmmConfig = rlmmConfig;
+        this.time = time;
+
+        //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
+        consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+        consumerTaskThread = KafkaThread.daemon("RLMMConsumerTask", consumerTask);
+    }
+
+    public void startConsumerThread() {
+        try {
+            // Start a thread to continuously consume records from topic partitions.
+            consumerTaskThread.start();
+        } catch (Exception e) {
+            throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
+        }
+    }
+
+    /**
+     * Wait until the consumption reaches the offset of the metadata partition for the given {@code recordMetadata}.
+     *
+     * @param recordMetadata record metadata to be checked for consumption.
+     */
+    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) {
+        final int partition = recordMetadata.partition();
+
+        // If the current assignment does not have the subscription for this partition then return immediately.
+        if (!assignedPartition(partition)) {
+            log.warn("This consumer is not subscribed to the target partition [{}] on which message is produced.",
+                    partition);
+            return;
+        }
+
+        final long offset = recordMetadata.offset();
+        long startTimeMillis = time.milliseconds();
+        while (true) {
+            long committedOffset = committedOffset(partition);

Review comment:
       Can we avoid making the variable name the same as the function name?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
+ * It also provides a way to wait until the given record is received by the consumer before it is timed out with an interval of
+ * {@link TopicBasedRemoteLogMetadataManagerConfig#consumeWaitMs()}.
+ */
+public class ConsumerManager implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
+    private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;

Review comment:
       should this be fixed or configurable in `rlmmConfig`?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;

Review comment:
       why is this set to 30, rather than another number like 10, 50, 100?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
+            remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+        } else {
+            log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
+        }
+    }
+
+    private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {

Review comment:
       also here. Why are the variable names different? One is `updatedPartitions` and the other is `partitions`

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaProducer<byte[], byte[]> producer;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+    // It indicates whether closing process has been started or not. It will not accept any
+    // messages once it is set as true.
+    private volatile boolean close = false;
+
+    public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
+        this.rlmmConfig = rlmmConfig;
+        this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
+        topicPartitioner = rlmmTopicPartitioner;
+    }
+
+    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
+        ensureNotClosed();
+
+        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
+        int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
+        log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
+                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+
+        ProducerCallback callback = new ProducerCallback();
+        try {
+            if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
+                // This should never occur as long as metadata partitions always remain the same.
+                throw new KafkaException("Chosen partition no " + metadataPartitionNum +
+                                         " is more than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());
+            }
+            producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
+                    serde.serialize(remoteLogMetadata)), callback).get();
+        } catch (KafkaException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e);
+        }
+
+        if (callback.exception() == null) {
+            return callback.recordMetadata();
+        } else {
+            Exception ex = callback.exception();
+            if (ex instanceof KafkaException) {

Review comment:
       out of curiosity: why we don't do the check within the `new KafkaException(...)` call

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaProducer<byte[], byte[]> producer;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+    // It indicates whether closing process has been started or not. It will not accept any
+    // messages once it is set as true.
+    private volatile boolean close = false;
+
+    public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
+        this.rlmmConfig = rlmmConfig;
+        this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
+        topicPartitioner = rlmmTopicPartitioner;
+    }
+
+    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
+        ensureNotClosed();
+
+        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
+        int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
+        log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
+                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+
+        ProducerCallback callback = new ProducerCallback();
+        try {
+            if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
+                // This should never occur as long as metadata partitions always remain the same.
+                throw new KafkaException("Chosen partition no " + metadataPartitionNum +
+                                         " is more than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());
+            }
+            producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null,
+                    serde.serialize(remoteLogMetadata)), callback).get();
+        } catch (KafkaException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e);
+        }
+
+        if (callback.exception() == null) {
+            return callback.recordMetadata();
+        } else {
+            Exception ex = callback.exception();
+            if (ex instanceof KafkaException) {
+                throw (KafkaException) ex;
+            } else {
+                throw new KafkaException(ex);
+            }
+        }
+    }
+
+    private void ensureNotClosed() {
+        if (close) {
+            throw new IllegalStateException("This instance is already closed.");
+        }
+    }
+
+    public void close() {
+        // If it is already closed, return from here.
+        if (close) {
+            return;
+        }
+
+        close = true;
+
+        if (producer != null) {

Review comment:
       is the check necessary?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    private volatile boolean close = false;

Review comment:
       We may want a better variable name here. e.g., `isClosing` or `closed` or something else.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,

Review comment:
       `Id` -> `If`

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
+ * It also provides a way to wait until the given record is received by the consumer before it is timed out with an interval of
+ * {@link TopicBasedRemoteLogMetadataManagerConfig#consumeWaitMs()}.
+ */
+public class ConsumerManager implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
+    private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
+
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+    private final Time time;
+    private final ConsumerTask consumerTask;
+    private final Thread consumerTaskThread;
+
+    public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner,
+                           Time time) {
+        this.rlmmConfig = rlmmConfig;
+        this.time = time;
+
+        //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
+        consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+        consumerTaskThread = KafkaThread.daemon("RLMMConsumerTask", consumerTask);
+    }
+
+    public void startConsumerThread() {
+        try {
+            // Start a thread to continuously consume records from topic partitions.
+            consumerTaskThread.start();
+        } catch (Exception e) {
+            throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
+        }
+    }
+
+    /**
+     * Wait until the consumption reaches the offset of the metadata partition for the given {@code recordMetadata}.
+     *
+     * @param recordMetadata record metadata to be checked for consumption.
+     */
+    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) {
+        final int partition = recordMetadata.partition();
+
+        // If the current assignment does not have the subscription for this partition then return immediately.
+        if (!assignedPartition(partition)) {
+            log.warn("This consumer is not subscribed to the target partition [{}] on which message is produced.",
+                    partition);
+            return;
+        }
+
+        final long offset = recordMetadata.offset();
+        long startTimeMillis = time.milliseconds();

Review comment:
       I think in the codebase we use `Ms` more often than using `Millis`

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
+            remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+        } else {
+            log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
+        }
+    }
+
+    private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
+        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    }
+
+    private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions,
+                                                Set<TopicIdPartition> removedPartitions) {
+        log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions);
+        ensureNotClosed();
+
+        Objects.requireNonNull(addedPartitions, "addedPartitions must not be null");
+        Objects.requireNonNull(removedPartitions, "removedPartitions must not be null");
+
+        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
+            return;
+        }
+
+        synchronized (assignPartitionsLock) {
+            Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions);
+            updatedReassignedPartitions.addAll(addedPartitions);
+            updatedReassignedPartitions.removeAll(removedPartitions);
+            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
+            for (TopicIdPartition tp : updatedReassignedPartitions) {
+                updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
+            }
+            assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
+            log.debug("Assigned topic partitions: {}", assignedTopicPartitions);
+
+            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
+                assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);
+                log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions);
+                assignPartitions = true;
+                assignPartitionsLock.notifyAll();
+            } else {
+                log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions);
+            }
+        }
+    }
+
+    public Optional<Long> receivedOffsetForPartition(int partition) {
+        return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+    }
+
+    public boolean assignedPartition(int partition) {

Review comment:
       The function name is a little confusing with variable names in the same class, e.g., `assignPartitions`, `assignedTopicPartitions`. I am not sure if it would be better to rename `assignedPartition` to `isAssignedPartition`.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {

Review comment:
       multiple threads are reading and writing `close`, which is not thread safe

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager {
+    private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+    private volatile boolean configured = false;
+
+    // It indicates whether the close process of this instance is started or not via #close() method.
+    // Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+    // if the field is read but not updated in a spin loop like in #initializeResources() method.
+    private final AtomicBoolean close = new AtomicBoolean(false);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+    private Thread initializationThread;
+    private Time time = Time.SYSTEM;
+    private volatile ProducerManager producerManager;
+    private volatile ConsumerManager consumerManager;
+
+    // This allows to gracefully close this instance using {@link #close()} method while there are some pending or new
+    // requests calling different methods which use the resources like producer/consumer managers.
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore();
+    private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+    private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+    public TopicBasedRemoteLogMetadataManager() {
+    }
+
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManager(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
+            throws RemoteStorageException {
+        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
+
+        ensureInitializedAndNotClosed();
+
+        // This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers
+        // closing the producer/consumer manager instances.
+        lock.readLock().lock();
+        try {
+
+            // This method is allowed only to add remote log segment with the initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+            // but not to update the existing remote log segment metadata.
+            if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+                throw new IllegalArgumentException(
+                        "Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+                        + " but it contains state as: " + remoteLogSegmentMetadata.state());
+            }
+
+            // Publish the message to the topic.
+            doPublishMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
+                              remoteLogSegmentMetadata);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate)
+            throws RemoteStorageException {
+        Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null");
+
+        ensureInitializedAndNotClosed();
+
+        lock.readLock().lock();
+        try {
+            // Callers should use addRemoteLogSegmentMetadata to add RemoteLogSegmentMetadata with state as
+            // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
+            if (segmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+                throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: "
+                                                   + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+            }
+
+            // Publish the message to the topic.
+            doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)

Review comment:
       function names `addRemoteLogSegmentMetadata`, `updateRemoteLogSegmentMetadata`, and `putRemotePartitionDeleteMetadata` don't seem very consistent. Is there a way to improve it?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaProducer<byte[], byte[]> producer;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+    // It indicates whether closing process has been started or not. It will not accept any
+    // messages once it is set as true.
+    private volatile boolean close = false;
+
+    public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
+        this.rlmmConfig = rlmmConfig;
+        this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
+        topicPartitioner = rlmmTopicPartitioner;
+    }
+
+    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
+        ensureNotClosed();
+
+        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
+        int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
+        log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
+                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+
+        ProducerCallback callback = new ProducerCallback();
+        try {
+            if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
+                // This should never occur as long as metadata partitions always remain the same.
+                throw new KafkaException("Chosen partition no " + metadataPartitionNum +

Review comment:
       will the exception be caught by your own catch in line 74? maybe move it out of the `try` block?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class RemoteLogMetadataTopicPartitioner {
+    public static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataTopicPartitioner.class);
+    private final int numMetadataTopicPartitions;
+
+    public RemoteLogMetadataTopicPartitioner(int numMetadataTopicPartitions) {
+        this.numMetadataTopicPartitions = numMetadataTopicPartitions;
+    }
+
+    public int metadataPartition(TopicIdPartition topicIdPartition) {
+        Objects.requireNonNull(topicIdPartition, "TopicPartition can not be null");
+
+        int partitionNum = Utils.toPositive(Utils.murmur2(toBytes(topicIdPartition))) % numMetadataTopicPartitions;
+        log.debug("No of partitions [{}], partitionNum: [{}] for given topic: [{}]", numMetadataTopicPartitions, partitionNum, topicIdPartition);
+        return partitionNum;
+    }
+
+    private byte[] toBytes(TopicIdPartition topicIdPartition) {
+        // We do not want to depend upon hash code generation of Uuid as that may change.
+        int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(),
+                                topicIdPartition.topicId().getMostSignificantBits(),
+                                topicIdPartition.topicPartition().partition());
+
+        return toBytes(hash);
+    }
+
+    private byte[] toBytes(int n) {
+        return new byte[]{
+            (byte) (n >> 24),
+            (byte) (n >> 16),
+            (byte) (n >> 8),
+            (byte) n
+        };
+    }
+}

Review comment:
       new line at the end of the file

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {

Review comment:
       is the check necessary?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
+            remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+        } else {
+            log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
+        }
+    }
+
+    private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
+        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    }
+
+    private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions,
+                                                Set<TopicIdPartition> removedPartitions) {
+        log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions);
+        ensureNotClosed();
+
+        Objects.requireNonNull(addedPartitions, "addedPartitions must not be null");
+        Objects.requireNonNull(removedPartitions, "removedPartitions must not be null");
+
+        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
+            return;
+        }
+
+        synchronized (assignPartitionsLock) {
+            Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions);
+            updatedReassignedPartitions.addAll(addedPartitions);
+            updatedReassignedPartitions.removeAll(removedPartitions);
+            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
+            for (TopicIdPartition tp : updatedReassignedPartitions) {
+                updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
+            }
+            assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
+            log.debug("Assigned topic partitions: {}", assignedTopicPartitions);
+
+            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
+                assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);
+                log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions);
+                assignPartitions = true;
+                assignPartitionsLock.notifyAll();
+            } else {
+                log.debug("No change in assigned metadata topic partitions: {}", assignedMetaPartitions);
+            }
+        }
+    }
+
+    public Optional<Long> receivedOffsetForPartition(int partition) {
+        return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+    }
+
+    public boolean assignedPartition(int partition) {
+        return assignedMetaPartitions.contains(partition);
+    }
+
+    private void ensureNotClosed() {
+        if (close) {
+            throw new IllegalStateException("This instance is already closed");
+        }
+    }
+
+    public void close() {
+        if (!close) {

Review comment:
       not good to use the same name for a variable and a function.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaProducer<byte[], byte[]> producer;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+    // It indicates whether closing process has been started or not. It will not accept any
+    // messages once it is set as true.
+    private volatile boolean close = false;
+
+    public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) {
+        this.rlmmConfig = rlmmConfig;
+        this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
+        topicPartitioner = rlmmTopicPartitioner;
+    }
+
+    public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException {
+        ensureNotClosed();
+
+        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
+        int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition);
+        log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]",
+                topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+
+        ProducerCallback callback = new ProducerCallback();
+        try {
+            if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) {
+                // This should never occur as long as metadata partitions always remain the same.
+                throw new KafkaException("Chosen partition no " + metadataPartitionNum +
+                                         " is more than the partition count: " + rlmmConfig.metadataTopicPartitionsCount());

Review comment:
       greater than or equal to?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    // It indicates whether the closing process has been started or not. Id it is set as true,
+    // consumer will stop consuming messages and it will not allow partition assignments to be updated.
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;

Review comment:
       can you explain what this variable means?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager {
+    private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+    private volatile boolean configured = false;
+
+    // It indicates whether the close process of this instance is started or not via #close() method.
+    // Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+    // if the field is read but not updated in a spin loop like in #initializeResources() method.
+    private final AtomicBoolean close = new AtomicBoolean(false);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+    private Thread initializationThread;
+    private Time time = Time.SYSTEM;
+    private volatile ProducerManager producerManager;
+    private volatile ConsumerManager consumerManager;
+
+    // This allows to gracefully close this instance using {@link #close()} method while there are some pending or new
+    // requests calling different methods which use the resources like producer/consumer managers.
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore();
+    private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+    private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+    public TopicBasedRemoteLogMetadataManager() {
+    }
+
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManager(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
+            throws RemoteStorageException {
+        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
+
+        ensureInitializedAndNotClosed();
+
+        // This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers
+        // closing the producer/consumer manager instances.
+        lock.readLock().lock();
+        try {
+
+            // This method is allowed only to add remote log segment with the initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+            // but not to update the existing remote log segment metadata.
+            if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+                throw new IllegalArgumentException(
+                        "Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+                        + " but it contains state as: " + remoteLogSegmentMetadata.state());
+            }
+
+            // Publish the message to the topic.
+            doPublishMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
+                              remoteLogSegmentMetadata);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate)
+            throws RemoteStorageException {
+        Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null");
+
+        ensureInitializedAndNotClosed();
+
+        lock.readLock().lock();
+        try {
+            // Callers should use addRemoteLogSegmentMetadata to add RemoteLogSegmentMetadata with state as
+            // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
+            if (segmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+                throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: "
+                                                   + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+            }
+
+            // Publish the message to the topic.
+            doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
+            throws RemoteStorageException {
+        Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
+
+        ensureInitializedAndNotClosed();
+
+        lock.readLock().lock();
+        try {
+
+            doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata)
+            throws RemoteStorageException {
+        log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
+        ensureInitializedAndNotClosed();
+
+        try {
+            // Publish the message to the topic.
+            RecordMetadata recordMetadata = producerManager.publishMessage(
+                    remoteLogMetadata);
+            // Wait until the consumer catches up with this offset. This will ensure read-after-write consistency
+            // semantics.
+            consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+        } catch (KafkaException e) {
+            if (e instanceof RetriableException) {
+                throw e;
+            } else {
+                throw new RemoteStorageException(e);
+            }
+        }
+    }
+
+    @Override
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                       int epochForOffset,
+                                                                       long offset)
+            throws RemoteStorageException {
+        ensureInitializedAndNotClosed();
+
+        return remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset);
+    }
+
+    @Override
+    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch)
+            throws RemoteStorageException {
+        ensureInitializedAndNotClosed();
+
+        return remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch);
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
+            throws RemoteStorageException {
+        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
+        ensureInitializedAndNotClosed();
+
+        return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
+            throws RemoteStorageException {
+        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
+        ensureInitializedAndNotClosed();
+
+        return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);

Review comment:
       Here has a race condition. It is possible `close=false` before calling `ensureInitializedAndNotClosed()` and the `close()` function call by another thread has completed before calling `remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch)`.
   
   I think we should always grab a read lock before calling `ensureInitializedAndNotClosed();`, or do some other fancy things.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will remove the partitions that ard deleted from
+ * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
+
+    // Map of remote log metadata topic partition to target end offsets to be consumed.
+    private final Map<Integer, Long> partitionToTargetEndOffsets = new ConcurrentHashMap<>();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                }
+
+                // Check whether messages are received till end offsets or not for the assigned metadata partitions.
+                if (!partitionToTargetEndOffsets.isEmpty()) {
+                    for (Map.Entry<Integer, Long> entry : partitionToTargetEndOffsets.entrySet()) {
+                        final Long offset = partitionToConsumedOffsets.getOrDefault(entry.getKey(), 0L);
+                        if (offset >= entry.getValue()) {
+                            partitionToTargetEndOffsets.remove(entry.getKey());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+    }
+
+    private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+
+        log.debug("Fetching end offsets to consumer task [{}]", assignedMetaTopicPartitions);
+        Map<TopicPartition, Long> endOffsets;
+        while (true) {
+            try {
+                endOffsets = consumer.endOffsets(assignedMetaTopicPartitions, Duration.ofSeconds(30));
+                break;
+            } catch (Exception e) {
+                // ignore exception
+                log.debug("Error encountered in fetching end offsets", e);
+            }
+        }
+        log.debug("Fetched end offsets to consumer task [{}]", endOffsets);
+
+        for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
+            if (entry.getValue() > 0) {
+                partitionToTargetEndOffsets.put(entry.getKey().partition(), entry.getValue());
+            }
+        }
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
+        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    }
+
+    private void updateAssignmentsForPartitions(Set<TopicIdPartition> addedPartitions,
+                                                Set<TopicIdPartition> removedPartitions) {
+        log.info("Updating assignments for addedPartitions: {} and removedPartition: {}", addedPartitions, removedPartitions);
+        ensureNotClosed();
+
+        Objects.requireNonNull(addedPartitions, "addedPartitions must not be null");
+        Objects.requireNonNull(removedPartitions, "removedPartitions must not be null");
+
+        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
+            return;
+        }
+
+        synchronized (assignPartitionsLock) {
+            Set<TopicIdPartition> updatedReassignedPartitions = new HashSet<>(assignedTopicPartitions);
+            updatedReassignedPartitions.addAll(addedPartitions);
+            updatedReassignedPartitions.removeAll(removedPartitions);
+            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
+            for (TopicIdPartition tp : updatedReassignedPartitions) {
+                updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
+            }
+
+            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
+                assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
+                assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);

Review comment:
       What is the cost of consuming from the beginning if the remote metadata partition grows huge?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org