You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/07 17:30:37 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

cmccabe opened a new pull request #10990:
URL: https://github.com/apache/kafka/pull/10990


   The broker should trigger a snapshot once
   metadata.log.max.record.bytes.between.snapshots has been exceeded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r667062463



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+        util.Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().
+          setPartitionId(0).
+          setTopicId(FOO_ID).
+          setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+          new ApiMessageAndVersion(new PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+    }
+    listener.getImageRecords().get()
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, snapshotter)
+    try {
+      (0 to 3).foreach {
+        id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
+            util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+              setBrokerId(id).
+              setBrokerEpoch(100L).
+              setFenced(false).
+              setRack(null).
+              setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)), 1))))
+      }
+      listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
+        util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
+            setName("foo").
+            setTopicId(FOO_ID), 1.toShort),
+          new ApiMessageAndVersion(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setIsr(util.Arrays.asList(0, 1, 2)).
+            setLeader(0).
+            setReplicas(util.Arrays.asList(0, 1, 2)).
+            setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
+            setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+      listener.getImageRecords().get()
+      assertEquals(200L, listener.highestMetadataOffset())
+
+      // Check that we generate at least one snapshot once we see enough records.
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      generateManyRecords(listener, 1000L);
+      assertEquals(1000L, snapshotter.prevCommittedOffset)
+      assertEquals(1000L, snapshotter.activeSnapshotOffset)
+      snapshotter.activeSnapshotOffset = -1L
+
+      // Test creating a new snapshot after publishing it.
+      val publisher = new MockMetadataPublisher()

Review comment:
       Publishing does involve a slightly different code path (although not that different).
   
   For one thing, once we're publishing, the delta will be empty when `_delta.apply()` is called (which is fine, it just means we get the image the delta is based on, unmodified).
   
   Anyway, it's an easy test to do, and quick as well, so I thought it was worth it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r667060078



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,

Review comment:
       It makes it easier since otherwise I'd have to figure out exactly which record triggered the snapshot... that would make the test brittle as well since record sizes will change as fields are added, etc.

##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,

Review comment:
       It makes it easier since otherwise I'd have to figure out exactly which record batch triggered the snapshot... that would make the test brittle as well since record sizes will change as fields are added, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r666512156



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+  def build(committedOffset: Long,
+            committedEpoch: Int,
+            lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+  brokerId: Int,
+  val time: Time,
+  threadNamePrefix: Option[String],
+  writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter id=${brokerId}] ")
+  private val log = logContext.logger(classOf[BrokerMetadataSnapshotter])

Review comment:
       nit: we shouldn't need this. instead of `log.info`, you can use `info` since we are extending `Logging`

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+  def build(committedOffset: Long,
+            committedEpoch: Int,
+            lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+  brokerId: Int,
+  val time: Time,
+  threadNamePrefix: Option[String],
+  writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter id=${brokerId}] ")
+  private val log = logContext.logger(classOf[BrokerMetadataSnapshotter])
+  logIdent = logContext.logPrefix()
+
+  /**
+   * The offset of the snapshot in progress, or -1 if there isn't one. Accessed only under
+   * the object lock.
+   */
+  private var _currentSnapshotOffset = -1L
+
+  /**
+   * The event queue which runs this listener.
+   */
+  val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
+
+  override def maybeStartSnapshot(committedOffset: Long,
+                                  committedEpoch: Int,
+                                  lastContainedLogTime: Long,
+                                  image: MetadataImage): Boolean = synchronized {
+    if (_currentSnapshotOffset == -1L) {
+      val writer = writerBuilder.build(committedOffset, committedEpoch, lastContainedLogTime)
+      _currentSnapshotOffset = committedOffset
+      info(s"Creating a new snapshot at offset ${committedOffset}...")
+      eventQueue.append(new CreateSnapshotEvent(image, writer))
+      true
+    } else {
+      warn(s"Declining to create a new snapshot at offset ${committedOffset} because " +
+           s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
+      false
+    }
+  }
+
+  class CreateSnapshotEvent(image: MetadataImage,
+                            writer: SnapshotWriter[ApiMessageAndVersion])
+        extends EventQueue.Event {
+    override def run(): Unit = {
+      try {
+        image.write(writer.append(_))
+        writer.freeze()
+      } finally {
+        try {
+          writer.close()
+        } finally {
+          BrokerMetadataSnapshotter.this.synchronized {
+            _currentSnapshotOffset = -1L
+          }
+        }
+      }
+    }
+
+    override def handleException(e: Throwable): Unit = {
+      if (e.isInstanceOf[RejectedExecutionException]) {

Review comment:
       nit: nicer as a `match`
   ```scala
   e match {
     case _: RejectedExecutionException =>
   ...
   ```

##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.{CompletableFuture, CountDownLatch}
+
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.protocol.ByteBufferAccessor
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.queue.EventQueue
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+
+class BrokerMetadataSnapshotterTest {
+  val SERDE = new MetadataRecordSerde()

Review comment:
       Maybe we may as well move this to MetadataRecordSerde. We could also get rid of the static instance in RecordTestUtils.

##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -18,28 +18,31 @@
 package kafka.server.metadata
 
 import java.util
+import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Optional}
 
 import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
 
 class BrokerMetadataListenerTest {
   @Test
   def testCreateAndClose(): Unit = {
-    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+      (_, _, _, _) => throw new UnsupportedOperationException())

Review comment:
       nit: any harm using `MetadataSnapshotter` here and below? 

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -102,15 +121,28 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      maybePublish(results.highestMetadataOffset)
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      _publisher.foreach(publish(_, results.highestMetadataOffset))
+      if (shouldSnapshot()) {
+        if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
+                                           _highestEpoch,
+                                           _highestTimestamp,
+                                           _delta.apply())) {
+          _bytesSinceLastSnapshot = 0L
+        }
+      }
     }
   }
 
+  private def shouldSnapshot(): Boolean = {
+    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+  }
+
   /**
    * Handle metadata snapshots
    */
   override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit =
-    eventQueue.append(new HandleSnapshotEvent(reader))
+      eventQueue.append(new HandleSnapshotEvent(reader))

Review comment:
       nit: old alignment seemed right

##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,

Review comment:
       Not sure it matters, but it is a little strange that we use the same end offset for all of these records.

##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+        util.Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().
+          setPartitionId(0).
+          setTopicId(FOO_ID).
+          setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+          new ApiMessageAndVersion(new PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+    }
+    listener.getImageRecords().get()
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, snapshotter)
+    try {
+      (0 to 3).foreach {
+        id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
+            util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+              setBrokerId(id).
+              setBrokerEpoch(100L).
+              setFenced(false).
+              setRack(null).
+              setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)), 1))))
+      }
+      listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
+        util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
+            setName("foo").
+            setTopicId(FOO_ID), 1.toShort),
+          new ApiMessageAndVersion(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setIsr(util.Arrays.asList(0, 1, 2)).
+            setLeader(0).
+            setReplicas(util.Arrays.asList(0, 1, 2)).
+            setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
+            setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+      listener.getImageRecords().get()
+      assertEquals(200L, listener.highestMetadataOffset())
+
+      // Check that we generate at least one snapshot once we see enough records.
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      generateManyRecords(listener, 1000L);
+      assertEquals(1000L, snapshotter.prevCommittedOffset)
+      assertEquals(1000L, snapshotter.activeSnapshotOffset)
+      snapshotter.activeSnapshotOffset = -1L
+
+      // Test creating a new snapshot after publishing it.
+      val publisher = new MockMetadataPublisher()

Review comment:
       I'm probably missing something, but it's not clear to me why publishing makes this test case interesting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#issuecomment-876562610


   That one build failure looks like some kind of Jenkins weirdness.
   ```
   [2021-07-08T00:12:38.900Z] > requirement failed: Source file '/tmp/sbt_8c62b7b4/META-INF/MANIFEST.MF' does not exist.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10990:
URL: https://github.com/apache/kafka/pull/10990


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r667066884



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.{CompletableFuture, CountDownLatch}
+
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.protocol.ByteBufferAccessor
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.queue.EventQueue
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+
+class BrokerMetadataSnapshotterTest {
+  val SERDE = new MetadataRecordSerde()

Review comment:
       yeah, good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r667059387



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -18,28 +18,31 @@
 package kafka.server.metadata
 
 import java.util
+import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Optional}
 
 import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
 
 class BrokerMetadataListenerTest {
   @Test
   def testCreateAndClose(): Unit = {
-    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+      (_, _, _, _) => throw new UnsupportedOperationException())

Review comment:
       It starts a thread, which we would have to stop. Also I'd like to be sure that we didn't try to create a snapshot in this case (so, didn't see the UVE exception)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r667066884



##########
File path: core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.{CompletableFuture, CountDownLatch}
+
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.protocol.ByteBufferAccessor
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.queue.EventQueue
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+
+class BrokerMetadataSnapshotterTest {
+  val SERDE = new MetadataRecordSerde()

Review comment:
       agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org