You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/12 00:16:08 UTC

[kafka] branch trunk updated: MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc20c55  MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
fc20c55 is described below

commit fc20c551d6b4f5c675d9bd80876950e23d194933
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Feb 11 16:14:31 2022 -0800

    MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
    
    The current naming of the fields in `ProducerIdsRecord` is a little confusing in regard to whether the block range was inclusive or exclusive. This patch tries to improve naming to make this clearer. In the record class, instead of `ProducerIdsEnd`, we use `NextProducerId`. We have also updated related classes such as `ProducerIdsBlock.java` with similar changes.
    
    Reviewers: dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  4 +-
 .../transaction/ProducerIdManager.scala            | 20 +++----
 core/src/main/scala/kafka/zk/ZkData.scala          |  6 +--
 .../kafka/controller/ProducerIdControlManager.java | 31 +++++------
 .../apache/kafka/controller/QuorumController.java  |  4 +-
 .../org/apache/kafka/image/ProducerIdsDelta.java   | 16 +++---
 .../org/apache/kafka/image/ProducerIdsImage.java   | 20 +++----
 .../common/metadata/ProducerIdsRecord.json         |  4 +-
 .../controller/ProducerIdControlManagerTest.java   | 26 ++++-----
 .../kafka/controller/QuorumControllerTest.java     |  2 +-
 .../apache/kafka/image/ProducerIdsImageTest.java   |  4 +-
 .../kafka/server/common/ProducerIdsBlock.java      | 62 ++++++++++++++--------
 .../kafka/server/common/ProducerIdsBlockTest.java  | 46 ++++++++++++++++
 13 files changed, 155 insertions(+), 90 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 46ce379..c8d973a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2407,8 +2407,8 @@ class KafkaController(val config: KafkaConfig,
         case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
         case Right(pidBlock) => callback.apply(
           new AllocateProducerIdsResponseData()
-            .setProducerIdStart(pidBlock.producerIdStart())
-            .setProducerIdLen(pidBlock.producerIdLen()))
+            .setProducerIdStart(pidBlock.firstProducerId())
+            .setProducerIdLen(pidBlock.size()))
       }
     }
     eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index b5d419d..e1f46eb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -76,13 +76,13 @@ object ZkProducerIdManager {
           val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
           logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
+          if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
           logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
           new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
@@ -114,7 +114,7 @@ class ZkProducerIdManager(brokerId: Int,
   // grab the first block of producerIds
   this synchronized {
     allocateNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.producerIdStart
+    nextProducerId = currentProducerIdBlock.firstProducerId
   }
 
   private def allocateNewProducerIdBlock(): Unit = {
@@ -126,9 +126,9 @@ class ZkProducerIdManager(brokerId: Int,
   def generateProducerId(): Long = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
-      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
+      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         allocateNewProducerIdBlock()
-        nextProducerId = currentProducerIdBlock.producerIdStart
+        nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
       nextProducerId - 1
@@ -158,13 +158,13 @@ class RPCProducerIdManager(brokerId: Int,
         nextProducerId += 1
 
         // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.producerIdStart + currentProducerIdBlock.producerIdLen * ProducerIdManager.PidPrefetchThreshold)) {
+        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
           maybeRequestNextBlock()
         }
       }
 
       // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
+      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
           throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
@@ -172,7 +172,7 @@ class RPCProducerIdManager(brokerId: Int,
           block match {
             case Success(nextBlock) =>
               currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.producerIdStart
+              nextProducerId = currentProducerIdBlock.firstProducerId
             case Failure(t) => throw t
           }
         }
@@ -212,7 +212,7 @@ class RPCProducerIdManager(brokerId: Int,
       case Errors.NONE =>
         debug(s"Got next producer ID block from controller $data")
         // Do some sanity checks on the response
-        if (data.producerIdStart() < currentProducerIdBlock.producerIdEnd) {
+        if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
           nextProducerIdBlock.put(Failure(new KafkaException(
             s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")))
         } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 0f6db4a..baed563 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -772,9 +772,9 @@ object ProducerIdBlockZNode {
 
   def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {
     Json.encodeAsBytes(Map("version" -> CurrentVersion,
-      "broker" -> producerIdBlock.brokerId,
-      "block_start" -> producerIdBlock.producerIdStart.toString,
-      "block_end" -> producerIdBlock.producerIdEnd.toString).asJava
+      "broker" -> producerIdBlock.assignedBrokerId,
+      "block_start" -> producerIdBlock.firstProducerId.toString,
+      "block_end" -> producerIdBlock.lastProducerId.toString).asJava
     )
   }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 7291f93..d6491e2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -33,49 +33,50 @@ import java.util.List;
 public class ProducerIdControlManager {
 
     private final ClusterControlManager clusterControlManager;
-    private final TimelineLong lastProducerId;
+    private final TimelineLong nextProducerId; // Initializes to 0
 
     ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
         this.clusterControlManager = clusterControlManager;
-        this.lastProducerId = new TimelineLong(snapshotRegistry);
+        this.nextProducerId = new TimelineLong(snapshotRegistry);
     }
 
     ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
         clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
 
-        long producerId = lastProducerId.get();
-
-        if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
+        long firstProducerIdInBlock = nextProducerId.get();
+        if (firstProducerIdInBlock > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " +
-                "is will has exceeded long type limit");
+                "has exceeded the int64 type limit");
         }
 
-        long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE;
+        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, firstProducerIdInBlock, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
+        long newNextProducerId = block.nextBlockFirstId();
+
         ProducerIdsRecord record = new ProducerIdsRecord()
-            .setProducerIdsEnd(nextProducerId)
+            .setNextProducerId(newNextProducerId)
             .setBrokerId(brokerId)
             .setBrokerEpoch(brokerEpoch);
-        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
         return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block);
     }
 
     void replay(ProducerIdsRecord record) {
-        long currentProducerId = lastProducerId.get();
-        if (record.producerIdsEnd() <= currentProducerId) {
-            throw new RuntimeException("Producer ID from record is not monotonically increasing");
+        long currentNextProducerId = nextProducerId.get();
+        if (record.nextProducerId() <= currentNextProducerId) {
+            throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
+                " is not greater than current next Producer ID (" + currentNextProducerId + ")");
         } else {
-            lastProducerId.set(record.producerIdsEnd());
+            nextProducerId.set(record.nextProducerId());
         }
     }
 
     Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
         List<ApiMessageAndVersion> records = new ArrayList<>(1);
 
-        long producerId = lastProducerId.get(epoch);
+        long producerId = nextProducerId.get(epoch);
         if (producerId > 0) {
             records.add(new ApiMessageAndVersion(
                 new ProducerIdsRecord()
-                    .setProducerIdsEnd(producerId)
+                    .setNextProducerId(producerId)
                     .setBrokerId(0)
                     .setBrokerEpoch(0L),
                 (short) 0));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1bdd6e3..825726f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1451,8 +1451,8 @@ public final class QuorumController implements Controller {
         return appendWriteEvent("allocateProducerIds",
             () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
             .thenApply(result -> new AllocateProducerIdsResponseData()
-                    .setProducerIdStart(result.producerIdStart())
-                    .setProducerIdLen(result.producerIdLen()));
+                    .setProducerIdStart(result.firstProducerId())
+                    .setProducerIdLen(result.size()));
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
index c0c43ea..99dd207 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
@@ -21,18 +21,18 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
 
 
 public final class ProducerIdsDelta {
-    private long highestSeenProducerId;
+    private long nextProducerId;
 
     public ProducerIdsDelta(ProducerIdsImage image) {
-        this.highestSeenProducerId = image.highestSeenProducerId();
+        this.nextProducerId = image.highestSeenProducerId();
     }
 
-    public void setHighestSeenProducerId(long highestSeenProducerId) {
-        this.highestSeenProducerId = highestSeenProducerId;
+    public void setNextProducerId(long highestSeenProducerId) {
+        this.nextProducerId = highestSeenProducerId;
     }
 
-    public long highestSeenProducerId() {
-        return highestSeenProducerId;
+    public long nextProducerId() {
+        return nextProducerId;
     }
 
     public void finishSnapshot() {
@@ -40,10 +40,10 @@ public final class ProducerIdsDelta {
     }
 
     public void replay(ProducerIdsRecord record) {
-        highestSeenProducerId = record.producerIdsEnd();
+        nextProducerId = record.nextProducerId();
     }
 
     public ProducerIdsImage apply() {
-        return new ProducerIdsImage(highestSeenProducerId);
+        return new ProducerIdsImage(nextProducerId);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index 1d497e0..37851f9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -34,23 +34,23 @@ import java.util.function.Consumer;
 public final class ProducerIdsImage {
     public final static ProducerIdsImage EMPTY = new ProducerIdsImage(-1L);
 
-    private final long highestSeenProducerId;
+    private final long nextProducerId;
 
-    public ProducerIdsImage(long highestSeenProducerId) {
-        this.highestSeenProducerId = highestSeenProducerId;
+    public ProducerIdsImage(long nextProducerId) {
+        this.nextProducerId = nextProducerId;
     }
 
     public long highestSeenProducerId() {
-        return highestSeenProducerId;
+        return nextProducerId;
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        if (highestSeenProducerId >= 0) {
+        if (nextProducerId >= 0) {
             out.accept(Collections.singletonList(new ApiMessageAndVersion(
                 new ProducerIdsRecord().
                     setBrokerId(-1).
                     setBrokerEpoch(-1).
-                    setProducerIdsEnd(highestSeenProducerId), (short) 0)));
+                    setNextProducerId(nextProducerId), (short) 0)));
         }
     }
 
@@ -58,20 +58,20 @@ public final class ProducerIdsImage {
     public boolean equals(Object o) {
         if (!(o instanceof ProducerIdsImage)) return false;
         ProducerIdsImage other = (ProducerIdsImage) o;
-        return highestSeenProducerId == other.highestSeenProducerId;
+        return nextProducerId == other.nextProducerId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(highestSeenProducerId);
+        return Objects.hash(nextProducerId);
     }
 
     @Override
     public String toString() {
-        return "ProducerIdsImage(highestSeenProducerId=" + highestSeenProducerId + ")";
+        return "ProducerIdsImage(highestSeenProducerId=" + nextProducerId + ")";
     }
 
     public boolean isEmpty() {
-        return highestSeenProducerId < 0;
+        return nextProducerId < 0;
     }
 }
diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
index 0467871..9f0242c 100644
--- a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
+++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
@@ -24,7 +24,7 @@
       "about": "The ID of the requesting broker" },
     { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
       "about": "The epoch of the requesting broker" },
-    { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+",
-      "about": "The highest producer ID that has been generated"}
+    { "name": "NextProducerId", "type": "int64", "versions": "0+",
+      "about": "The next producerId that will be assigned (i.e. the first producerId in the next assigned block)"}
   ]
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 41cee2e..2161360 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -76,10 +76,10 @@ public class ProducerIdControlManagerTest {
     public void testInitialResult() {
         ControllerResult<ProducerIdsBlock> result =
             producerIdControlManager.generateNextProducerId(1, 100);
-        assertEquals(0, result.response().producerIdStart());
-        assertEquals(1000, result.response().producerIdLen());
+        assertEquals(0, result.response().firstProducerId());
+        assertEquals(1000, result.response().size());
         ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message();
-        assertEquals(1000, record.producerIdsEnd());
+        assertEquals(1000, record.nextProducerId());
     }
 
     @Test
@@ -88,11 +88,11 @@ public class ProducerIdControlManagerTest {
             new ProducerIdsRecord()
                 .setBrokerId(1)
                 .setBrokerEpoch(100)
-                .setProducerIdsEnd(42));
+                .setNextProducerId(42));
 
         ProducerIdsBlock range =
             producerIdControlManager.generateNextProducerId(1, 100).response();
-        assertEquals(42, range.producerIdStart());
+        assertEquals(42, range.firstProducerId());
 
         // Can't go backwards in Producer IDs
         assertThrows(RuntimeException.class, () -> {
@@ -100,19 +100,19 @@ public class ProducerIdControlManagerTest {
                 new ProducerIdsRecord()
                     .setBrokerId(1)
                     .setBrokerEpoch(100)
-                    .setProducerIdsEnd(40));
+                    .setNextProducerId(40));
         }, "Producer ID range must only increase");
         range = producerIdControlManager.generateNextProducerId(1, 100).response();
-        assertEquals(42, range.producerIdStart());
+        assertEquals(42, range.firstProducerId());
 
         // Gaps in the ID range are okay.
         producerIdControlManager.replay(
             new ProducerIdsRecord()
                 .setBrokerId(1)
                 .setBrokerEpoch(100)
-                .setProducerIdsEnd(50));
+                .setNextProducerId(50));
         range = producerIdControlManager.generateNextProducerId(1, 100).response();
-        assertEquals(50, range.producerIdStart());
+        assertEquals(50, range.firstProducerId());
     }
 
     @Test
@@ -132,7 +132,7 @@ public class ProducerIdControlManagerTest {
             new ProducerIdsRecord()
                 .setBrokerId(1)
                 .setBrokerEpoch(100)
-                .setProducerIdsEnd(Long.MAX_VALUE - 1));
+                .setNextProducerId(Long.MAX_VALUE - 1));
 
         assertThrows(UnknownServerException.class, () ->
             producerIdControlManager.generateNextProducerId(1, 100));
@@ -149,7 +149,7 @@ public class ProducerIdControlManagerTest {
         assertTrue(snapshotIterator.hasNext());
         List<ApiMessageAndVersion> batch = snapshotIterator.next();
         assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record");
-        assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord) batch.get(0).message()).producerIdsEnd());
+        assertEquals(range.firstProducerId() + range.size(), ((ProducerIdsRecord) batch.get(0).message()).nextProducerId());
         assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch");
 
         ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -159,9 +159,9 @@ public class ProducerIdControlManagerTest {
         }
 
         // Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs
-        long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1;
+        long lastProducerID = range.firstProducerId() + range.size() - 1;
         range = generateProducerIds(producerIdControlManager, 1, 100);
-        assertTrue(range.producerIdStart() > lastProducerID);
+        assertTrue(range.firstProducerId() > lastProducerID);
     }
 
     static ProducerIdsBlock generateProducerIds(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 84585f3..4035f68 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -565,7 +565,7 @@ public class QuorumControllerTest {
             new ApiMessageAndVersion(new ProducerIdsRecord().
                 setBrokerId(0).
                 setBrokerEpoch(brokerEpochs.get(0)).
-                setProducerIdsEnd(1000), (short) 0)
+                setNextProducerId(1000), (short) 0)
         );
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index 2b147bd..819186d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -46,11 +46,11 @@ public class ProducerIdsImageTest {
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
             setBrokerId(2).
             setBrokerEpoch(100).
-            setProducerIdsEnd(456), (short) 0));
+            setNextProducerId(456), (short) 0));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
             setBrokerId(3).
             setBrokerEpoch(100).
-            setProducerIdsEnd(789), (short) 0));
+            setNextProducerId(789), (short) 0));
 
         DELTA1 = new ProducerIdsDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
index 8a0fd84..b2633bf 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
@@ -29,40 +29,58 @@ public class ProducerIdsBlock {
 
     public static final ProducerIdsBlock EMPTY = new ProducerIdsBlock(-1, 0, 0);
 
-    private final int brokerId;
-    private final long producerIdStart;
-    private final int producerIdLen;
+    private final int assignedBrokerId;
+    private final long firstProducerId;
+    private final int blockSize;
 
-    public ProducerIdsBlock(int brokerId, long producerIdStart, int producerIdLen) {
-        this.brokerId = brokerId;
-        this.producerIdStart = producerIdStart;
-        this.producerIdLen = producerIdLen;
+    public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) {
+        this.assignedBrokerId = assignedBrokerId;
+        this.firstProducerId = firstProducerId;
+        this.blockSize = blockSize;
     }
 
-    public int brokerId() {
-        return brokerId;
+    /**
+     * Get the ID of the broker that this block was assigned to.
+     */
+    public int assignedBrokerId() {
+        return assignedBrokerId;
     }
 
-    public long producerIdStart() {
-        return producerIdStart;
+    /**
+     * Get the first ID (inclusive) to be assigned from this block.
+     */
+    public long firstProducerId() {
+        return firstProducerId;
     }
 
-    public int producerIdLen() {
-        return producerIdLen;
+    /**
+     * Get the number of IDs contained in this block.
+     */
+    public int size() {
+        return blockSize;
     }
 
-    public long producerIdEnd() {
-        return producerIdStart + producerIdLen - 1;
+    /**
+     * Get the last ID (inclusive) to be assigned from this block.
+     */
+    public long lastProducerId() {
+        return firstProducerId + blockSize - 1;
     }
 
+    /**
+     * Get the first ID of the next block following this one.
+     */
+    public long nextBlockFirstId() {
+        return firstProducerId + blockSize;
+    }
 
     @Override
     public String toString() {
-        return "ProducerIdsBlock{" +
-                "brokerId=" + brokerId +
-                ", producerIdStart=" + producerIdStart +
-                ", producerIdLen=" + producerIdLen +
-                '}';
+        return "ProducerIdsBlock(" +
+                "assignedBrokerId=" + assignedBrokerId +
+                ", firstProducerId=" + firstProducerId +
+                ", size=" + blockSize +
+                ')';
     }
 
     @Override
@@ -70,11 +88,11 @@ public class ProducerIdsBlock {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ProducerIdsBlock that = (ProducerIdsBlock) o;
-        return brokerId == that.brokerId && producerIdStart == that.producerIdStart && producerIdLen == that.producerIdLen;
+        return assignedBrokerId == that.assignedBrokerId && firstProducerId == that.firstProducerId && blockSize == that.blockSize;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(brokerId, producerIdStart, producerIdLen);
+        return Objects.hash(assignedBrokerId, firstProducerId, blockSize);
     }
 }
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
new file mode 100644
index 0000000..f15c171
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ProducerIdsBlockTest {
+
+    @Test
+    public void testEmptyBlock() {
+        assertEquals(-1, ProducerIdsBlock.EMPTY.lastProducerId());
+        assertEquals(0, ProducerIdsBlock.EMPTY.nextBlockFirstId());
+        assertEquals(0, ProducerIdsBlock.EMPTY.size());
+    }
+
+    @Test
+    public void testDynamicBlock() {
+        long firstId = 1309418324L;
+        int blockSize = 5391;
+        int brokerId = 5;
+
+        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, firstId, blockSize);
+        assertEquals(firstId, block.firstProducerId());
+        assertEquals(firstId + blockSize - 1, block.lastProducerId());
+        assertEquals(firstId + blockSize, block.nextBlockFirstId());
+        assertEquals(blockSize, block.size());
+        assertEquals(brokerId, block.assignedBrokerId());
+    }
+
+}
\ No newline at end of file