You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/12 00:16:08 UTC
[kafka] branch trunk updated: MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc20c55 MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
fc20c55 is described below
commit fc20c551d6b4f5c675d9bd80876950e23d194933
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Feb 11 16:14:31 2022 -0800
MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
The current naming of the fields in `ProducerIdsRecord` is a little confusing in regard to whether the block range was inclusive or exclusive. This patch tries to improve naming to make this clearer. In the record class, instead of `ProducerIdsEnd`, we use `NextProducerId`. We have also updated related classes such as `ProducerIdsBlock.java` with similar changes.
Reviewers: dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
.../scala/kafka/controller/KafkaController.scala | 4 +-
.../transaction/ProducerIdManager.scala | 20 +++----
core/src/main/scala/kafka/zk/ZkData.scala | 6 +--
.../kafka/controller/ProducerIdControlManager.java | 31 +++++------
.../apache/kafka/controller/QuorumController.java | 4 +-
.../org/apache/kafka/image/ProducerIdsDelta.java | 16 +++---
.../org/apache/kafka/image/ProducerIdsImage.java | 20 +++----
.../common/metadata/ProducerIdsRecord.json | 4 +-
.../controller/ProducerIdControlManagerTest.java | 26 ++++-----
.../kafka/controller/QuorumControllerTest.java | 2 +-
.../apache/kafka/image/ProducerIdsImageTest.java | 4 +-
.../kafka/server/common/ProducerIdsBlock.java | 62 ++++++++++++++--------
.../kafka/server/common/ProducerIdsBlockTest.java | 46 ++++++++++++++++
13 files changed, 155 insertions(+), 90 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 46ce379..c8d973a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2407,8 +2407,8 @@ class KafkaController(val config: KafkaConfig,
case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
case Right(pidBlock) => callback.apply(
new AllocateProducerIdsResponseData()
- .setProducerIdStart(pidBlock.producerIdStart())
- .setProducerIdLen(pidBlock.producerIdLen()))
+ .setProducerIdStart(pidBlock.firstProducerId())
+ .setProducerIdLen(pidBlock.size()))
}
}
eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index b5d419d..e1f46eb 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -76,13 +76,13 @@ object ZkProducerIdManager {
val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
- if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
+ if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
// we have exhausted all producerIds (wow!), treat it as a fatal error
- logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
+ logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})")
throw new KafkaException("Have exhausted all producerIds.")
}
- new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
+ new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
case None =>
logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
@@ -114,7 +114,7 @@ class ZkProducerIdManager(brokerId: Int,
// grab the first block of producerIds
this synchronized {
allocateNewProducerIdBlock()
- nextProducerId = currentProducerIdBlock.producerIdStart
+ nextProducerId = currentProducerIdBlock.firstProducerId
}
private def allocateNewProducerIdBlock(): Unit = {
@@ -126,9 +126,9 @@ class ZkProducerIdManager(brokerId: Int,
def generateProducerId(): Long = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
- if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
+ if (nextProducerId > currentProducerIdBlock.lastProducerId) {
allocateNewProducerIdBlock()
- nextProducerId = currentProducerIdBlock.producerIdStart
+ nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
nextProducerId - 1
@@ -158,13 +158,13 @@ class RPCProducerIdManager(brokerId: Int,
nextProducerId += 1
// Check if we need to fetch the next block
- if (nextProducerId >= (currentProducerIdBlock.producerIdStart + currentProducerIdBlock.producerIdLen * ProducerIdManager.PidPrefetchThreshold)) {
+ if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
maybeRequestNextBlock()
}
}
// If we've exhausted the current block, grab the next block (waiting if necessary)
- if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
+ if (nextProducerId > currentProducerIdBlock.lastProducerId) {
val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
if (block == null) {
throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block")
@@ -172,7 +172,7 @@ class RPCProducerIdManager(brokerId: Int,
block match {
case Success(nextBlock) =>
currentProducerIdBlock = nextBlock
- nextProducerId = currentProducerIdBlock.producerIdStart
+ nextProducerId = currentProducerIdBlock.firstProducerId
case Failure(t) => throw t
}
}
@@ -212,7 +212,7 @@ class RPCProducerIdManager(brokerId: Int,
case Errors.NONE =>
debug(s"Got next producer ID block from controller $data")
// Do some sanity checks on the response
- if (data.producerIdStart() < currentProducerIdBlock.producerIdEnd) {
+ if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
nextProducerIdBlock.put(Failure(new KafkaException(
s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")))
} else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 0f6db4a..baed563 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -772,9 +772,9 @@ object ProducerIdBlockZNode {
def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {
Json.encodeAsBytes(Map("version" -> CurrentVersion,
- "broker" -> producerIdBlock.brokerId,
- "block_start" -> producerIdBlock.producerIdStart.toString,
- "block_end" -> producerIdBlock.producerIdEnd.toString).asJava
+ "broker" -> producerIdBlock.assignedBrokerId,
+ "block_start" -> producerIdBlock.firstProducerId.toString,
+ "block_end" -> producerIdBlock.lastProducerId.toString).asJava
)
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 7291f93..d6491e2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -33,49 +33,50 @@ import java.util.List;
public class ProducerIdControlManager {
private final ClusterControlManager clusterControlManager;
- private final TimelineLong lastProducerId;
+ private final TimelineLong nextProducerId; // Initializes to 0
ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
this.clusterControlManager = clusterControlManager;
- this.lastProducerId = new TimelineLong(snapshotRegistry);
+ this.nextProducerId = new TimelineLong(snapshotRegistry);
}
ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
- long producerId = lastProducerId.get();
-
- if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
+ long firstProducerIdInBlock = nextProducerId.get();
+ if (firstProducerIdInBlock > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " +
- "is will has exceeded long type limit");
+ "has exceeded the int64 type limit");
}
- long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE;
+ ProducerIdsBlock block = new ProducerIdsBlock(brokerId, firstProducerIdInBlock, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
+ long newNextProducerId = block.nextBlockFirstId();
+
ProducerIdsRecord record = new ProducerIdsRecord()
- .setProducerIdsEnd(nextProducerId)
+ .setNextProducerId(newNextProducerId)
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch);
- ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block);
}
void replay(ProducerIdsRecord record) {
- long currentProducerId = lastProducerId.get();
- if (record.producerIdsEnd() <= currentProducerId) {
- throw new RuntimeException("Producer ID from record is not monotonically increasing");
+ long currentNextProducerId = nextProducerId.get();
+ if (record.nextProducerId() <= currentNextProducerId) {
+ throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
+ " is not greater than current next Producer ID (" + currentNextProducerId + ")");
} else {
- lastProducerId.set(record.producerIdsEnd());
+ nextProducerId.set(record.nextProducerId());
}
}
Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
List<ApiMessageAndVersion> records = new ArrayList<>(1);
- long producerId = lastProducerId.get(epoch);
+ long producerId = nextProducerId.get(epoch);
if (producerId > 0) {
records.add(new ApiMessageAndVersion(
new ProducerIdsRecord()
- .setProducerIdsEnd(producerId)
+ .setNextProducerId(producerId)
.setBrokerId(0)
.setBrokerEpoch(0L),
(short) 0));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1bdd6e3..825726f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1451,8 +1451,8 @@ public final class QuorumController implements Controller {
return appendWriteEvent("allocateProducerIds",
() -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
.thenApply(result -> new AllocateProducerIdsResponseData()
- .setProducerIdStart(result.producerIdStart())
- .setProducerIdLen(result.producerIdLen()));
+ .setProducerIdStart(result.firstProducerId())
+ .setProducerIdLen(result.size()));
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
index c0c43ea..99dd207 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
@@ -21,18 +21,18 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
public final class ProducerIdsDelta {
- private long highestSeenProducerId;
+ private long nextProducerId;
public ProducerIdsDelta(ProducerIdsImage image) {
- this.highestSeenProducerId = image.highestSeenProducerId();
+ this.nextProducerId = image.highestSeenProducerId();
}
- public void setHighestSeenProducerId(long highestSeenProducerId) {
- this.highestSeenProducerId = highestSeenProducerId;
+ public void setNextProducerId(long highestSeenProducerId) {
+ this.nextProducerId = highestSeenProducerId;
}
- public long highestSeenProducerId() {
- return highestSeenProducerId;
+ public long nextProducerId() {
+ return nextProducerId;
}
public void finishSnapshot() {
@@ -40,10 +40,10 @@ public final class ProducerIdsDelta {
}
public void replay(ProducerIdsRecord record) {
- highestSeenProducerId = record.producerIdsEnd();
+ nextProducerId = record.nextProducerId();
}
public ProducerIdsImage apply() {
- return new ProducerIdsImage(highestSeenProducerId);
+ return new ProducerIdsImage(nextProducerId);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index 1d497e0..37851f9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -34,23 +34,23 @@ import java.util.function.Consumer;
public final class ProducerIdsImage {
public final static ProducerIdsImage EMPTY = new ProducerIdsImage(-1L);
- private final long highestSeenProducerId;
+ private final long nextProducerId;
- public ProducerIdsImage(long highestSeenProducerId) {
- this.highestSeenProducerId = highestSeenProducerId;
+ public ProducerIdsImage(long nextProducerId) {
+ this.nextProducerId = nextProducerId;
}
public long highestSeenProducerId() {
- return highestSeenProducerId;
+ return nextProducerId;
}
public void write(Consumer<List<ApiMessageAndVersion>> out) {
- if (highestSeenProducerId >= 0) {
+ if (nextProducerId >= 0) {
out.accept(Collections.singletonList(new ApiMessageAndVersion(
new ProducerIdsRecord().
setBrokerId(-1).
setBrokerEpoch(-1).
- setProducerIdsEnd(highestSeenProducerId), (short) 0)));
+ setNextProducerId(nextProducerId), (short) 0)));
}
}
@@ -58,20 +58,20 @@ public final class ProducerIdsImage {
public boolean equals(Object o) {
if (!(o instanceof ProducerIdsImage)) return false;
ProducerIdsImage other = (ProducerIdsImage) o;
- return highestSeenProducerId == other.highestSeenProducerId;
+ return nextProducerId == other.nextProducerId;
}
@Override
public int hashCode() {
- return Objects.hash(highestSeenProducerId);
+ return Objects.hash(nextProducerId);
}
@Override
public String toString() {
- return "ProducerIdsImage(highestSeenProducerId=" + highestSeenProducerId + ")";
+ return "ProducerIdsImage(highestSeenProducerId=" + nextProducerId + ")";
}
public boolean isEmpty() {
- return highestSeenProducerId < 0;
+ return nextProducerId < 0;
}
}
diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
index 0467871..9f0242c 100644
--- a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
+++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json
@@ -24,7 +24,7 @@
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
- { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+",
- "about": "The highest producer ID that has been generated"}
+ { "name": "NextProducerId", "type": "int64", "versions": "0+",
+ "about": "The next producerId that will be assigned (i.e. the first producerId in the next assigned block)"}
]
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 41cee2e..2161360 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -76,10 +76,10 @@ public class ProducerIdControlManagerTest {
public void testInitialResult() {
ControllerResult<ProducerIdsBlock> result =
producerIdControlManager.generateNextProducerId(1, 100);
- assertEquals(0, result.response().producerIdStart());
- assertEquals(1000, result.response().producerIdLen());
+ assertEquals(0, result.response().firstProducerId());
+ assertEquals(1000, result.response().size());
ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message();
- assertEquals(1000, record.producerIdsEnd());
+ assertEquals(1000, record.nextProducerId());
}
@Test
@@ -88,11 +88,11 @@ public class ProducerIdControlManagerTest {
new ProducerIdsRecord()
.setBrokerId(1)
.setBrokerEpoch(100)
- .setProducerIdsEnd(42));
+ .setNextProducerId(42));
ProducerIdsBlock range =
producerIdControlManager.generateNextProducerId(1, 100).response();
- assertEquals(42, range.producerIdStart());
+ assertEquals(42, range.firstProducerId());
// Can't go backwards in Producer IDs
assertThrows(RuntimeException.class, () -> {
@@ -100,19 +100,19 @@ public class ProducerIdControlManagerTest {
new ProducerIdsRecord()
.setBrokerId(1)
.setBrokerEpoch(100)
- .setProducerIdsEnd(40));
+ .setNextProducerId(40));
}, "Producer ID range must only increase");
range = producerIdControlManager.generateNextProducerId(1, 100).response();
- assertEquals(42, range.producerIdStart());
+ assertEquals(42, range.firstProducerId());
// Gaps in the ID range are okay.
producerIdControlManager.replay(
new ProducerIdsRecord()
.setBrokerId(1)
.setBrokerEpoch(100)
- .setProducerIdsEnd(50));
+ .setNextProducerId(50));
range = producerIdControlManager.generateNextProducerId(1, 100).response();
- assertEquals(50, range.producerIdStart());
+ assertEquals(50, range.firstProducerId());
}
@Test
@@ -132,7 +132,7 @@ public class ProducerIdControlManagerTest {
new ProducerIdsRecord()
.setBrokerId(1)
.setBrokerEpoch(100)
- .setProducerIdsEnd(Long.MAX_VALUE - 1));
+ .setNextProducerId(Long.MAX_VALUE - 1));
assertThrows(UnknownServerException.class, () ->
producerIdControlManager.generateNextProducerId(1, 100));
@@ -149,7 +149,7 @@ public class ProducerIdControlManagerTest {
assertTrue(snapshotIterator.hasNext());
List<ApiMessageAndVersion> batch = snapshotIterator.next();
assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record");
- assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord) batch.get(0).message()).producerIdsEnd());
+ assertEquals(range.firstProducerId() + range.size(), ((ProducerIdsRecord) batch.get(0).message()).nextProducerId());
assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch");
ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -159,9 +159,9 @@ public class ProducerIdControlManagerTest {
}
// Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs
- long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1;
+ long lastProducerID = range.firstProducerId() + range.size() - 1;
range = generateProducerIds(producerIdControlManager, 1, 100);
- assertTrue(range.producerIdStart() > lastProducerID);
+ assertTrue(range.firstProducerId() > lastProducerID);
}
static ProducerIdsBlock generateProducerIds(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 84585f3..4035f68 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -565,7 +565,7 @@ public class QuorumControllerTest {
new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(0).
setBrokerEpoch(brokerEpochs.get(0)).
- setProducerIdsEnd(1000), (short) 0)
+ setNextProducerId(1000), (short) 0)
);
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
index 2b147bd..819186d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
@@ -46,11 +46,11 @@ public class ProducerIdsImageTest {
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(2).
setBrokerEpoch(100).
- setProducerIdsEnd(456), (short) 0));
+ setNextProducerId(456), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord().
setBrokerId(3).
setBrokerEpoch(100).
- setProducerIdsEnd(789), (short) 0));
+ setNextProducerId(789), (short) 0));
DELTA1 = new ProducerIdsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
index 8a0fd84..b2633bf 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
@@ -29,40 +29,58 @@ public class ProducerIdsBlock {
public static final ProducerIdsBlock EMPTY = new ProducerIdsBlock(-1, 0, 0);
- private final int brokerId;
- private final long producerIdStart;
- private final int producerIdLen;
+ private final int assignedBrokerId;
+ private final long firstProducerId;
+ private final int blockSize;
- public ProducerIdsBlock(int brokerId, long producerIdStart, int producerIdLen) {
- this.brokerId = brokerId;
- this.producerIdStart = producerIdStart;
- this.producerIdLen = producerIdLen;
+ public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) {
+ this.assignedBrokerId = assignedBrokerId;
+ this.firstProducerId = firstProducerId;
+ this.blockSize = blockSize;
}
- public int brokerId() {
- return brokerId;
+ /**
+ * Get the ID of the broker that this block was assigned to.
+ */
+ public int assignedBrokerId() {
+ return assignedBrokerId;
}
- public long producerIdStart() {
- return producerIdStart;
+ /**
+ * Get the first ID (inclusive) to be assigned from this block.
+ */
+ public long firstProducerId() {
+ return firstProducerId;
}
- public int producerIdLen() {
- return producerIdLen;
+ /**
+ * Get the number of IDs contained in this block.
+ */
+ public int size() {
+ return blockSize;
}
- public long producerIdEnd() {
- return producerIdStart + producerIdLen - 1;
+ /**
+ * Get the last ID (inclusive) to be assigned from this block.
+ */
+ public long lastProducerId() {
+ return firstProducerId + blockSize - 1;
}
+ /**
+ * Get the first ID of the next block following this one.
+ */
+ public long nextBlockFirstId() {
+ return firstProducerId + blockSize;
+ }
@Override
public String toString() {
- return "ProducerIdsBlock{" +
- "brokerId=" + brokerId +
- ", producerIdStart=" + producerIdStart +
- ", producerIdLen=" + producerIdLen +
- '}';
+ return "ProducerIdsBlock(" +
+ "assignedBrokerId=" + assignedBrokerId +
+ ", firstProducerId=" + firstProducerId +
+ ", size=" + blockSize +
+ ')';
}
@Override
@@ -70,11 +88,11 @@ public class ProducerIdsBlock {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProducerIdsBlock that = (ProducerIdsBlock) o;
- return brokerId == that.brokerId && producerIdStart == that.producerIdStart && producerIdLen == that.producerIdLen;
+ return assignedBrokerId == that.assignedBrokerId && firstProducerId == that.firstProducerId && blockSize == that.blockSize;
}
@Override
public int hashCode() {
- return Objects.hash(brokerId, producerIdStart, producerIdLen);
+ return Objects.hash(assignedBrokerId, firstProducerId, blockSize);
}
}
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
new file mode 100644
index 0000000..f15c171
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ProducerIdsBlockTest {
+
+ @Test
+ public void testEmptyBlock() {
+ assertEquals(-1, ProducerIdsBlock.EMPTY.lastProducerId());
+ assertEquals(0, ProducerIdsBlock.EMPTY.nextBlockFirstId());
+ assertEquals(0, ProducerIdsBlock.EMPTY.size());
+ }
+
+ @Test
+ public void testDynamicBlock() {
+ long firstId = 1309418324L;
+ int blockSize = 5391;
+ int brokerId = 5;
+
+ ProducerIdsBlock block = new ProducerIdsBlock(brokerId, firstId, blockSize);
+ assertEquals(firstId, block.firstProducerId());
+ assertEquals(firstId + blockSize - 1, block.lastProducerId());
+ assertEquals(firstId + blockSize, block.nextBlockFirstId());
+ assertEquals(blockSize, block.size());
+ assertEquals(brokerId, block.assignedBrokerId());
+ }
+
+}
\ No newline at end of file