You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/30 14:29:20 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

mumrah commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r661516512



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exact = new HashMap<>();
+        Set<String> any = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exact.put(component.entityType(), component.match());
+                    break;
+                case MATCH_TYPE_DEFAULT:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_DEFAULT, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    exact.put(component.entityType(), null);
+                    break;
+                case MATCH_TYPE_SPECIFIED:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_SPECIFIED, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    any.add(component.entityType());
+                    break;
+                default:
+                    response.setErrorCode(UNSUPPORTED_VERSION.code());
+                    response.setErrorMessage("Unknown match type " + component.matchType());
+                    return response;
+            }
+        }
+        // TODO: this is O(N). We should do some indexing here to speed it up.

Review comment:
       Is there any way to use the cache structures in ClientQuotaCache.scala?

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exact = new HashMap<>();
+        Set<String> any = new HashSet<>();

Review comment:
       How about more explicit names like `exactMatch` and `typeMatch`?

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {

Review comment:
       We previously would check for duplicates as well as empty entity types on the describe filter
   
   

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -70,230 +58,191 @@ class BrokerMetadataListener(
    */
   @volatile private var _highestMetadataOffset = -1L
 
+  /**
+   * The current broker metadata image. Accessed only from the event queue thread.
+   */
+  private var _image = MetadataImage.EMPTY
+
+  /**
+   * The current metadata delta. Accessed only from the event queue thread.
+   */
+  private var _delta = new MetadataDelta(_image)
+
+  /**
+   * The object to use to publish new metadata changes, or None if this listener has not
+   * been activated yet.
+   */
+  private var _publisher: Option[MetadataPublisher] = None
+
+  /**
+   * The event queue which runs this listener.
+   */
   val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
 
+  /**
+   * Returns the highest metadata-offset. Thread-safe.
+   */
   def highestMetadataOffset(): Long = _highestMetadataOffset
 
   /**
    * Handle new metadata records.
    */
-  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
     eventQueue.append(new HandleCommitsEvent(reader))
+
+  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
+      extends EventQueue.FailureLoggingEvent(log) {
+    override def run(): Unit = {
+      val results = try {
+        val loadResults = loadBatches(_delta, reader)
+        if (isDebugEnabled) {
+          debug(s"Loaded new commits: ${loadResults}")
+        }
+        loadResults
+      } finally {
+        reader.close()
+      }
+      maybePublish(results.highestMetadataOffset)
+    }
   }
 
   /**
    * Handle metadata snapshots
    */
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
-    // Loading snapshot on the broker is currently not supported.
-    reader.close();
-    throw new UnsupportedOperationException(s"Loading snapshot (${reader.snapshotId()}) is not supported")
-  }
-
-  // Visible for testing. It's useful to execute events synchronously in order
-  // to make tests deterministic. This object is responsible for closing the reader.
-  private[metadata] def execCommits(batchReader: BatchReader[ApiMessageAndVersion]): Unit = {
-    new HandleCommitsEvent(batchReader).run()
-  }
+  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit =
+    eventQueue.append(new HandleSnapshotEvent(reader))
 
-  class HandleCommitsEvent(
-    reader: BatchReader[ApiMessageAndVersion]
-  ) extends EventQueue.FailureLoggingEvent(log) {
+  class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
+    extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      try {
-        while (reader.hasNext()) {
-          apply(reader.next())
-        }
+      val results = try {
+        info(s"Loading snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        _delta = new MetadataDelta(_image) // Discard any previous deltas.
+        val loadResults = loadBatches(_delta, reader)
+        _delta.finishSnapshot()
+        info(s"Loaded snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
+          s"${loadResults}")
+        loadResults
       } finally {
         reader.close()
       }
+      maybePublish(results.highestMetadataOffset)
     }
+  }
 
-    private def apply(batch: Batch[ApiMessageAndVersion]): Unit = {
-      val records = batch.records
-      val lastOffset = batch.lastOffset
+  case class BatchLoadResults(numBatches: Int,
+                              numRecords: Int,
+                              elapsedUs: Long,
+                              highestMetadataOffset: Long) {
+    override def toString(): String = {
+      s"${numBatches} batch(es) with ${numRecords} record(s) ending at offset " +
+      s"${highestMetadataOffset} in ${elapsedUs} microseconds"
+    }
+  }
 
-      if (isDebugEnabled) {
-        debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).")
-      }
-      val imageBuilder =
-        MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-      val startNs = time.nanoseconds()
+  private def loadBatches(delta: MetadataDelta,
+                         iterator: util.Iterator[Batch[ApiMessageAndVersion]]): BatchLoadResults = {
+    val startTimeNs = time.nanoseconds()
+    var numBatches = 0
+    var numRecords = 0
+    var newHighestMetadataOffset = _highestMetadataOffset
+    while (iterator.hasNext()) {
+      val batch = iterator.next()
       var index = 0
-      metadataBatchSizeHist.update(records.size())
-      records.iterator().asScala.foreach { record =>
-        try {
-          if (isTraceEnabled) {
-            trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1,
-              records.size(), record.toString))
-          }
-          handleMessage(imageBuilder, record.message, lastOffset)
-        } catch {
-          case e: Exception => error(s"Unable to handle record $index in batch " +
-            s"ending at offset $lastOffset", e)
-        }
-        index = index + 1
-      }
-      if (imageBuilder.hasChanges) {
-        val newImage = imageBuilder.build()
+      batch.records().forEach { messageAndVersion =>
+        newHighestMetadataOffset = batch.lastOffset()
         if (isTraceEnabled) {
-          trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}")
-        } else if (isDebugEnabled) {
-          debug(s"Metadata batch $lastOffset: creating new metadata image")
-        }
-        metadataCache.image(newImage)
-      } else if (isDebugEnabled) {
-        debug(s"Metadata batch $lastOffset: no new metadata image required.")
-      }
-      if (imageBuilder.hasPartitionChanges) {
-        if (isDebugEnabled) {
-          debug(s"Metadata batch $lastOffset: applying partition changes")
+          trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1,
+            batch.records().size(), messageAndVersion.message().toString()))
         }
-        replicaManager.handleMetadataRecords(imageBuilder, lastOffset,
-          RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
-      } else if (isDebugEnabled) {
-        debug(s"Metadata batch $lastOffset: no partition changes found.")
+        delta.replay(messageAndVersion.message())
+        numRecords += 1
+        index += 1
       }
-      _highestMetadataOffset = lastOffset
-      val endNs = time.nanoseconds()
-      val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS)
-      debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " +
-        "microseconds.")
-      batchProcessingTimeHist.update(deltaUs)
+      metadataBatchSizeHist.update(batch.records().size())
+      numBatches = numBatches + 1
     }
+    _highestMetadataOffset = newHighestMetadataOffset
+    val endTimeNs = time.nanoseconds()
+    val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS)
+    batchProcessingTimeHist.update(elapsedUs)
+    BatchLoadResults(numBatches, numRecords, elapsedUs, newHighestMetadataOffset)
   }
 
-  private def handleMessage(imageBuilder: MetadataImageBuilder,
-                            record: ApiMessage,
-                            lastOffset: Long): Unit = {
-    val recordType = try {
-      fromId(record.apiKey())
-    } catch {
-      case e: Exception => throw new RuntimeException("Unknown metadata record type " +
-      s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
-    }
-
-    record match {
-      case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
-      case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
-      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
-      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
-      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
-      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)
-      case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec)
-      case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
-      case rec: ConfigRecord => handleConfigRecord(rec)
-      case rec: ClientQuotaRecord => handleClientQuotaRecord(imageBuilder, rec)
-      case rec: ProducerIdsRecord => handleProducerIdRecord(rec)
-      case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
-    }
+  def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = {
+    val event = new StartPublishingEvent(publisher)
+    eventQueue.append(event)
+    event.future
   }
 
-  def handleRegisterBrokerRecord(imageBuilder: MetadataImageBuilder,
-                                 record: RegisterBrokerRecord): Unit = {
-    val broker = MetadataBroker(record)
-    imageBuilder.brokersBuilder().add(broker)
-  }
-
-  def handleUnregisterBrokerRecord(imageBuilder: MetadataImageBuilder,
-                                   record: UnregisterBrokerRecord): Unit = {
-    imageBuilder.brokersBuilder().remove(record.brokerId())
-  }
-
-  def handleTopicRecord(imageBuilder: MetadataImageBuilder,
-                        record: TopicRecord): Unit = {
-    imageBuilder.partitionsBuilder().addUuidMapping(record.name(), record.topicId())
-  }
+  class StartPublishingEvent(publisher: MetadataPublisher)
+      extends EventQueue.FailureLoggingEvent(log) {
+    val future = new CompletableFuture[Void]()
 
-  def handlePartitionRecord(imageBuilder: MetadataImageBuilder,
-                            record: PartitionRecord): Unit = {
-    imageBuilder.topicIdToName(record.topicId()) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
-      case Some(name) =>
-        val partition = MetadataPartition(name, record)
-        imageBuilder.partitionsBuilder().set(partition)
+    override def run(): Unit = {
+      _publisher = Some(publisher)
+      log.info(s"Starting to publish metadata events at offset ${_highestMetadataOffset}.")
+      try {
+        maybePublish(_highestMetadataOffset)
+        future.complete(null)
+      } catch {
+        case e: Throwable => future.completeExceptionally(e)
+      }
     }
   }
 
-  def handleConfigRecord(record: ConfigRecord): Unit = {
-    val t = ConfigResource.Type.forId(record.resourceType())
-    if (t == ConfigResource.Type.UNKNOWN) {
-      throw new RuntimeException("Unable to understand config resource type " +
-        s"${Integer.valueOf(record.resourceType())}")
+  private def maybePublish(newHighestMetadataOffset: Long): Unit = {
+    _publisher match {
+      case None => // Nothing to do
+      case Some(publisher) => {
+        val delta = _delta
+        _image = _delta.apply()
+        _delta = new MetadataDelta(_image)
+        publisher.publish(newHighestMetadataOffset, delta, _image)
+      }
     }
-    val resource = new ConfigResource(t, record.resourceName())
-    configRepository.setConfig(resource, record.name(), record.value())
-  }
-
-  def handlePartitionChangeRecord(imageBuilder: MetadataImageBuilder,
-                                  record: PartitionChangeRecord): Unit = {
-    imageBuilder.partitionsBuilder().handleChange(record)
   }
 
-  def handleFenceBrokerRecord(imageBuilder: MetadataImageBuilder,
-                              record: FenceBrokerRecord): Unit = {
-    // TODO: add broker epoch to metadata cache, and check it here.
-    imageBuilder.brokersBuilder().changeFencing(record.id(), fenced = true)
+  override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
+    // TODO: cache leaderAndEpoch so we can use the epoch in broker-initiated snapshots.
   }
 
-  def handleUnfenceBrokerRecord(imageBuilder: MetadataImageBuilder,
-                                record: UnfenceBrokerRecord): Unit = {
-    // TODO: add broker epoch to metadata cache, and check it here.
-    imageBuilder.brokersBuilder().changeFencing(record.id(), fenced = false)
+  override def beginShutdown(): Unit = {
+    eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
   }
 
-  def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
-                              record: RemoveTopicRecord): Unit = {
-    imageBuilder.topicIdToName(record.topicId()) match {
-      case None =>
-        throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")
-
-      case Some(topicName) =>
-        info(s"Processing deletion of topic $topicName with id ${record.topicId}")
-        val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
-        groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq, RequestLocal.NoCaching)
-        configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
+  class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) {
+    override def run(): Unit = {
+      removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
+      removeMetric(BrokerMetadataListener.MetadataBatchSizes)
     }
   }
 
-  def handleClientQuotaRecord(imageBuilder: MetadataImageBuilder,
-                        record: ClientQuotaRecord): Unit = {
-    // TODO add quotas to MetadataImageBuilder
-    clientQuotaManager.handleQuotaRecord(record)
+  def close(): Unit = {
+    beginShutdown()
+    eventQueue.close()
   }
 
-  def handleProducerIdRecord(record: ProducerIdsRecord): Unit = {
-    // This is a no-op since brokers get their producer ID blocks directly from the controller via
-    // AllocateProducerIds RPC response
+  // VisibleForTesting
+  private[kafka] def getImageRecords(): CompletableFuture[util.List[ApiMessageAndVersion]] = {
+    val future = new CompletableFuture[util.List[ApiMessageAndVersion]]()
+    eventQueue.append(new GetImageRecordsEvent(future))
+    future
   }
 
-  class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
-      extends EventQueue.FailureLoggingEvent(log) {
-    override def run(): Unit = {
-      val imageBuilder =
-        MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-      imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
-      metadataCache.image(imageBuilder.build())
+  class GetImageRecordsEvent(future: CompletableFuture[util.List[ApiMessageAndVersion]])
+      extends EventQueue.FailureLoggingEvent(log) with Consumer[util.List[ApiMessageAndVersion]] {
+    val records = new util.ArrayList[ApiMessageAndVersion]()
+    override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
+      if (batch == null) {

Review comment:
       Why use the `null` here to finalize the image? Could we complete the future below where we call `_image.write`?
   
   Alternatively, we could use a composite Consumer + Closable type so MetadataImage could call `close` instead of `out.accept(null)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org