You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/15 18:28:47 UTC
kafka git commit: MINOR: Eliminate PID terminology from non test code
Repository: kafka
Updated Branches:
refs/heads/trunk e40e27b4e -> 3e6669000
MINOR: Eliminate PID terminology from non test code
Producer id is used instead.
Also refactored TransactionLog schema code to follow
our naming convention and to have better structure.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #3041 from ijuma/eliminate-pid-terminology
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e666900
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e666900
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e666900
Branch: refs/heads/trunk
Commit: 3e6669000f082808999a7216b00c4b0f5a94e1da
Parents: e40e27b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 15 11:26:08 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 15 11:26:08 2017 -0700
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 10 +-
.../clients/producer/internals/Sender.java | 27 +--
.../producer/internals/TransactionManager.java | 10 +-
.../apache/kafka/common/protocol/Errors.java | 4 +-
.../kafka/common/record/DefaultRecordBatch.java | 4 +-
.../common/record/MemoryRecordsBuilder.java | 10 +-
.../apache/kafka/common/record/RecordBatch.java | 4 +-
.../common/requests/AddOffsetsToTxnRequest.java | 6 +-
.../requests/AddPartitionsToTxnRequest.java | 12 +-
.../kafka/common/requests/EndTxnRequest.java | 12 +-
.../kafka/common/requests/FetchResponse.java | 6 +-
.../common/requests/TxnOffsetCommitRequest.java | 12 +-
.../common/requests/WriteTxnMarkersRequest.java | 12 +-
.../requests/WriteTxnMarkersResponse.java | 6 +-
.../clients/producer/internals/SenderTest.java | 4 +-
.../internals/TransactionManagerTest.java | 4 +-
.../kafka/coordinator/group/GroupMetadata.scala | 2 +-
.../transaction/TransactionLog.scala | 193 ++++++++++---------
...nsactionMarkerRequestCompletionHandler.scala | 2 +-
.../transaction/TransactionMetadata.scala | 6 +-
.../transaction/TransactionStateManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 22 +--
core/src/main/scala/kafka/log/LogManager.scala | 4 +-
.../scala/kafka/log/ProducerStateManager.scala | 22 +--
.../main/scala/kafka/server/KafkaConfig.scala | 8 +-
.../kafka/tools/ConsumerOffsetChecker.scala | 20 +-
.../scala/kafka/tools/DumpLogSegments.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 6 +-
29 files changed, 221 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index cf3736c..d53c19d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -446,19 +446,19 @@ public final class RecordAccumulator {
} else {
ProducerIdAndEpoch producerIdAndEpoch = null;
if (transactionManager != null) {
- producerIdAndEpoch = transactionManager.pidAndEpoch();
+ producerIdAndEpoch = transactionManager.producerIdAndEpoch();
if (!producerIdAndEpoch.isValid())
- // we cannot send the batch until we have refreshed the PID
+ // we cannot send the batch until we have refreshed the producer id
break;
}
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.inRetry()) {
- // If the batch is in retry, then we should not change the pid and
+ // If the batch is in retry, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular,
// the previous attempt may actually have been accepted, and if we change
- // the pid and sequence here, this attempt will also be accepted, causing
- // a duplicate.
+ // the producer id and sequence here, this attempt will also be accepted,
+ // causing a duplicate.
int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b96b41..da09a1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -108,7 +108,7 @@ public class Sender implements Runnable {
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions;
- /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */
+ /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager;
public Sender(KafkaClient client,
@@ -197,7 +197,7 @@ public class Sender implements Runnable {
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
- maybeWaitForPid();
+ maybeWaitForProducerId();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
@@ -237,7 +237,7 @@ public class Sender implements Runnable {
List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
boolean needsTransactionStateReset = false;
- // Reset the PID if an expired batch has previously been sent to the broker. Also update the metrics
+ // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
for (ProducerBatch expiredBatch : expiredBatches) {
@@ -370,8 +370,8 @@ public class Sender implements Runnable {
return null;
}
- private void maybeWaitForPid() {
- // If this is a transactional producer, the PID will be received when recovering transactions in the
+ private void maybeWaitForProducerId() {
+ // If this is a transactional producer, the producer id will be received when recovering transactions in the
// initTransactions() method of the producer.
if (transactionManager == null || transactionManager.isTransactional())
return;
@@ -395,7 +395,7 @@ public class Sender implements Runnable {
"We will back off and try again.");
}
} catch (Exception e) {
- log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
+ log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e);
}
log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
@@ -459,15 +459,16 @@ public class Sender implements Runnable {
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
- } else if (transactionManager.pidAndEpoch().producerId == batch.producerId() && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
- // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch.
+ } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() &&
+ transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
+ // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
transactionManager.sequenceNumber(batch.topicPartition));
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
- transactionManager.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
+ transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."));
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
}
} else {
@@ -476,7 +477,7 @@ public class Sender implements Runnable {
exception = new TopicAuthorizationException(batch.topicPartition.topic());
else
exception = error.exception();
- if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.pidAndEpoch().producerId)
+ if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId)
log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
"{} at offset {}. This indicates data loss on the broker, and should be investigated.",
correlationId, batch.topicPartition, response.baseOffset);
@@ -494,8 +495,8 @@ public class Sender implements Runnable {
} else {
completeBatch(batch, response);
- if (transactionManager != null && transactionManager.pidAndEpoch().producerId == batch.producerId()
- && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
+ if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId()
+ && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
transactionManager.sequenceNumber(batch.topicPartition));
@@ -519,7 +520,7 @@ public class Sender implements Runnable {
private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
if (transactionManager != null && !transactionManager.isTransactional()
- && batch.producerId() == transactionManager.pidAndEpoch().producerId) {
+ && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) {
// Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
// about the previously committed message. Note that this will discard the producer id and sequence
// numbers for all existing partitions.
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 566ad7c..7e2f813 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -114,7 +114,7 @@ public class TransactionManager {
// We use the priority to determine the order in which requests need to be sent out. For instance, if we have
- // a pending FindCoordinator request, that must always go first. Next, If we need a PID, that must go second.
+ // a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second.
// The endTxn request must always go last.
private enum Priority {
FIND_COORDINATOR(0),
@@ -262,17 +262,17 @@ public class TransactionManager {
}
/**
- * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
+ * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
* verify that the result is valid.
*
* @return the current ProducerIdAndEpoch.
*/
- ProducerIdAndEpoch pidAndEpoch() {
+ ProducerIdAndEpoch producerIdAndEpoch() {
return producerIdAndEpoch;
}
/**
- * Set the pid and epoch atomically.
+ * Set the producer id and epoch atomically.
*/
void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
this.producerIdAndEpoch = producerIdAndEpoch;
@@ -291,7 +291,7 @@ public class TransactionManager {
* messages will return an OutOfOrderSequenceException.
*
* Note that we can't reset the producer state for the transactional producer as this would mean bumping the epoch
- * for the same pid. This might involve aborting the ongoing transaction during the initPidRequest, and the user
+ * for the same producer id. This might involve aborting the ongoing transaction during the initPidRequest, and the user
* would not have any way of knowing this happened. So for the transactional producer, it's best to return the
* produce error to the user and let them abort the transaction and close the producer explicitly.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 58a0a2a..a0922cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -439,8 +439,8 @@ public enum Errors {
return new InvalidTxnStateException(message);
}
}),
- INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " +
- "its transactionalId",
+ INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer id which is not currently assigned to " +
+ "its transactional id",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index f321c3b..74bd3c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -361,7 +361,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
TimestampType timestampType,
long baseTimestamp,
long maxTimestamp,
- long pid,
+ long producerId,
short epoch,
int sequence,
boolean isTransactional,
@@ -384,7 +384,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
- buffer.putLong(position + PRODUCER_ID_OFFSET, pid);
+ buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 025b402..e52df76 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -95,7 +95,7 @@ public class MemoryRecordsBuilder {
* @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
* @param baseOffset The initial offset to use for
* @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
- * @param producerId The producer ID (PID) associated with the producer writing this record set
+ * @param producerId The producer ID associated with the producer writing this record set
* @param producerEpoch The epoch of the producer
* @param baseSequence The sequence number of the first record in this set
* @param isTransactional Whether or not the records are part of a transaction
@@ -212,15 +212,15 @@ public class MemoryRecordsBuilder {
}
}
- public void setProducerState(long pid, short epoch, int baseSequence) {
+ public void setProducerState(long producerId, short epoch, int baseSequence) {
if (isClosed()) {
// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
// If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
- // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence
+ // be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence
// once a batch has been sent to the broker risks introducing duplicates.
throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
}
- this.producerId = pid;
+ this.producerId = producerId;
this.producerEpoch = epoch;
this.baseSequence = baseSequence;
}
@@ -691,7 +691,7 @@ public class MemoryRecordsBuilder {
}
/**
- * Return the ProducerId (PID) of the RecordBatches created by this builder.
+ * Return the producer id of the RecordBatches created by this builder.
*/
public long producerId() {
return this.producerId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index c984deb..42b0c2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -132,9 +132,9 @@ public interface RecordBatch extends Iterable<Record> {
byte magic();
/**
- * Get the producer ID (PID) for this log record batch. For older magic versions, this will return 0.
+ * Get the producer id for this log record batch. For older magic versions, this will return 0.
*
- * @return The PID or -1 if there is none
+ * @return The producer id or -1 if there is none
*/
long producerId();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index b017242..4bf8b3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
public class AddOffsetsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
- private static final String PID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
@@ -68,7 +68,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
public AddOffsetsToTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
- this.producerId = struct.getLong(PID_KEY_NAME);
+ this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
}
@@ -93,7 +93,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
- struct.set(PID_KEY_NAME, producerId);
+ struct.set(PRODUCER_ID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, producerEpoch);
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
return struct;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 5bbea61..69ae25c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -29,8 +29,8 @@ import java.util.Map;
public class AddPartitionsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
- private static final String PID_KEY_NAME = "producer_id";
- private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -72,8 +72,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
public AddPartitionsToTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
- this.producerId = struct.getLong(PID_KEY_NAME);
- this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+ this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@@ -107,8 +107,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
- struct.set(PID_KEY_NAME, producerId);
- struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(PRODUCER_ID_KEY_NAME, producerId);
+ struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions);
Object[] partitionsArray = new Object[mappedPartitions.size()];
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 9c215be..ff9b82c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -24,8 +24,8 @@ import java.nio.ByteBuffer;
public class EndTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
- private static final String PID_KEY_NAME = "producer_id";
- private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
@@ -64,8 +64,8 @@ public class EndTxnRequest extends AbstractRequest {
public EndTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
- this.producerId = struct.getLong(PID_KEY_NAME);
- this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+ this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
}
@@ -89,8 +89,8 @@ public class EndTxnRequest extends AbstractRequest {
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
- struct.set(PID_KEY_NAME, producerId);
- struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(PRODUCER_ID_KEY_NAME, producerId);
+ struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index db12d26..0cb87b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse {
private static final String RECORD_SET_KEY_NAME = "record_set";
// aborted transaction field names
- private static final String PID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
private static final int DEFAULT_THROTTLE_TIME = 0;
@@ -211,7 +211,7 @@ public class FetchResponse extends AbstractResponse {
abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
for (Object abortedTransactionObj : abortedTransactionsArray) {
Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME);
+ long producerId = abortedTransactionStruct.getLong(PRODUCER_ID_KEY_NAME);
long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
}
@@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse {
List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
- abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId);
+ abortedTransactionStruct.set(PRODUCER_ID_KEY_NAME, abortedTransaction.producerId);
abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
abortedTransactionStructs.add(abortedTransactionStruct);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 8778b49..3f3024f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -28,8 +28,8 @@ import java.util.Map;
public class TxnOffsetCommitRequest extends AbstractRequest {
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
- private static final String PID_KEY_NAME = "producer_id";
- private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
private static final String RETENTION_TIME_KEY_NAME = "retention_time";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
@@ -84,8 +84,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
public TxnOffsetCommitRequest(Struct struct, short version) {
super(version);
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
- this.producerId = struct.getLong(PID_KEY_NAME);
- this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+ this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+ this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
@@ -128,8 +128,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
- struct.set(PID_KEY_NAME, producerId);
- struct.set(EPOCH_KEY_NAME, producerEpoch);
+ struct.set(PRODUCER_ID_KEY_NAME, producerId);
+ struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 0c09880..cf2c9fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -33,8 +33,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
- private static final String PID_KEY_NAME = "producer_id";
- private static final String EPOCH_KEY_NAME = "producer_epoch";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
@@ -138,8 +138,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
for (Object markerObj : markersArray) {
Struct markerStruct = (Struct) markerObj;
- long producerId = markerStruct.getLong(PID_KEY_NAME);
- short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+ long producerId = markerStruct.getLong(PRODUCER_ID_KEY_NAME);
+ short producerEpoch = markerStruct.getShort(PRODUCER_EPOCH_KEY_NAME);
int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
@@ -172,8 +172,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
int i = 0;
for (TxnMarkerEntry entry : markers) {
Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
- markerStruct.set(PID_KEY_NAME, entry.producerId);
- markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+ markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
+ markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 00133a6..06f6662 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -29,7 +29,7 @@ import java.util.Map;
public class WriteTxnMarkersResponse extends AbstractResponse {
private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
- private static final String PID_KEY_NAME = "producer_id";
+ private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String TOPIC_KEY_NAME = "topic";
@@ -62,7 +62,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
for (Object responseObj : responseArray) {
Struct responseStruct = (Struct) responseObj;
- long producerId = responseStruct.getLong(PID_KEY_NAME);
+ long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME);
Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@@ -90,7 +90,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
int k = 0;
for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
- responseStruct.set(PID_KEY_NAME, responseEntry.getKey());
+ responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(partitionAndErrors);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bb13dcb..1321fba 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -387,8 +387,8 @@ public class SenderTest {
}, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
sender.run(time.milliseconds());
assertTrue(transactionManager.hasProducerId());
- assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
- assertEquals((short) 0, transactionManager.pidAndEpoch().epoch);
+ assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
+ assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53686e2..c0acfec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -286,8 +286,8 @@ public class TransactionManagerTest {
assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
assertTrue(transactionManager.hasProducerId());
- assertEquals(pid, transactionManager.pidAndEpoch().producerId);
- assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
+ assertEquals(pid, transactionManager.producerIdAndEpoch().producerId);
+ assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 8122694..2f76d63 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -146,7 +146,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private val members = new mutable.HashMap[String, MemberMetadata]
private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
- // A map from a PID to the open offset commits for that pid.
+ // A map from a producer id to the open offset commits for that producer id.
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index a180502..d0c9e87 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -30,12 +30,12 @@ import org.apache.kafka.common.record.CompressionType
import scala.collection.mutable
/*
- * Messages stored for the transaction topic represent the pid and transactional status of the corresponding
+ * Messages stored for the transaction topic represent the producer id and transactional status of the corresponding
* transactional id, which have versions for both the key and value fields. Key and value
* versions are used to evolve the message formats:
*
* key version 0: [transactionalId]
- * -> value version 0: [pid, epoch, expire_timestamp, status, [topic [partition], timestamp ]
+ * -> value version 0: [producer_id, producer_epoch, expire_timestamp, status, [topic [partition], timestamp]
*/
object TransactionLog {
@@ -55,69 +55,73 @@ object TransactionLog {
val EnforcedRequiredAcks: Short = (-1).toShort
// log message formats
- private val TXN_ID_KEY = "transactional_id"
-
- private val PID_KEY = "pid"
- private val EPOCH_KEY = "epoch"
- private val TXN_TIMEOUT_KEY = "transaction_timeout"
- private val TXN_STATUS_KEY = "transaction_status"
- private val TXN_PARTITIONS_KEY = "transaction_partitions"
- private val TXN_ENTRY_TIMESTAMP_FIELD = "transaction_entry_timestamp"
- private val TXN_START_TIMESTAMP_FIELD = "transaction_start_timestamp"
- private val TOPIC_KEY = "topic"
- private val PARTITION_IDS_KEY = "partition_ids"
-
- private val KEY_SCHEMA_V0 = new Schema(new Field(TXN_ID_KEY, STRING))
- private val KEY_SCHEMA_TXN_ID_FIELD = KEY_SCHEMA_V0.get(TXN_ID_KEY)
-
- private val VALUE_PARTITIONS_SCHEMA = new Schema(new Field(TOPIC_KEY, STRING),
- new Field(PARTITION_IDS_KEY, new ArrayOf(INT32)))
- private val PARTITIONS_SCHEMA_TOPIC_FIELD = VALUE_PARTITIONS_SCHEMA.get(TOPIC_KEY)
- private val PARTITIONS_SCHEMA_PARTITION_IDS_FIELD = VALUE_PARTITIONS_SCHEMA.get(PARTITION_IDS_KEY)
-
- private val VALUE_SCHEMA_V0 = new Schema(new Field(PID_KEY, INT64, "Producer id in use by the transactional id."),
- new Field(EPOCH_KEY, INT16, "Epoch associated with the producer id"),
- new Field(TXN_TIMEOUT_KEY, INT32, "Transaction timeout in milliseconds"),
- new Field(TXN_STATUS_KEY, INT8,
- "TransactionState the transaction is in"),
- new Field(TXN_PARTITIONS_KEY, ArrayOf.nullable(VALUE_PARTITIONS_SCHEMA),
- "Set of partitions involved in the transaction"),
- new Field(TXN_ENTRY_TIMESTAMP_FIELD, INT64, "Time the transaction was last updated"),
- new Field(TXN_START_TIMESTAMP_FIELD, INT64, "Time the transaction was started"))
- private val VALUE_SCHEMA_PID_FIELD = VALUE_SCHEMA_V0.get(PID_KEY)
- private val VALUE_SCHEMA_EPOCH_FIELD = VALUE_SCHEMA_V0.get(EPOCH_KEY)
- private val VALUE_SCHEMA_TXN_TIMEOUT_FIELD = VALUE_SCHEMA_V0.get(TXN_TIMEOUT_KEY)
- private val VALUE_SCHEMA_TXN_STATUS_FIELD = VALUE_SCHEMA_V0.get(TXN_STATUS_KEY)
- private val VALUE_SCHEMA_TXN_PARTITIONS_FIELD = VALUE_SCHEMA_V0.get(TXN_PARTITIONS_KEY)
- private val VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_ENTRY_TIMESTAMP_FIELD)
- private val VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_START_TIMESTAMP_FIELD)
-
- private val KEY_SCHEMAS = Map(
- 0 -> KEY_SCHEMA_V0)
-
- private val VALUE_SCHEMAS = Map(
- 0 -> VALUE_SCHEMA_V0)
-
- private val CURRENT_KEY_SCHEMA_VERSION = 0.toShort
- private val CURRENT_VALUE_SCHEMA_VERSION = 0.toShort
-
- private val CURRENT_KEY_SCHEMA = schemaForKey(CURRENT_KEY_SCHEMA_VERSION)
-
- private val CURRENT_VALUE_SCHEMA = schemaForValue(CURRENT_VALUE_SCHEMA_VERSION)
+
+ private object KeySchema {
+ private val TXN_ID_KEY = "transactional_id"
+
+ private val V0 = new Schema(new Field(TXN_ID_KEY, STRING))
+ private val SCHEMAS = Map(0 -> V0)
+
+ val CURRENT_VERSION = 0.toShort
+ val CURRENT = schemaForKey(CURRENT_VERSION)
+
+ val TXN_ID_FIELD = V0.get(TXN_ID_KEY)
+
+ def ofVersion(version: Int): Option[Schema] = SCHEMAS.get(version)
+ }
+
+ private object ValueSchema {
+ private val ProducerIdKey = "producer_id"
+ private val ProducerEpochKey = "producer_epoch"
+ private val TxnTimeoutKey = "transaction_timeout"
+ private val TxnStatusKey = "transaction_status"
+ private val TxnPartitionsKey = "transaction_partitions"
+ private val TxnEntryTimestampKey = "transaction_entry_timestamp"
+ private val TxnStartTimestampKey = "transaction_start_timestamp"
+
+ private val PartitionIdsKey = "partition_ids"
+ private val TopicKey = "topic"
+ private val PartitionsSchema = new Schema(new Field(TopicKey, STRING),
+ new Field(PartitionIdsKey, new ArrayOf(INT32)))
+
+ private val V0 = new Schema(new Field(ProducerIdKey, INT64, "Producer id in use by the transactional id."),
+ new Field(ProducerEpochKey, INT16, "Epoch associated with the producer id"),
+ new Field(TxnTimeoutKey, INT32, "Transaction timeout in milliseconds"),
+ new Field(TxnStatusKey, INT8,
+ "TransactionState the transaction is in"),
+ new Field(TxnPartitionsKey, ArrayOf.nullable(PartitionsSchema),
+ "Set of partitions involved in the transaction"),
+ new Field(TxnEntryTimestampKey, INT64, "Time the transaction was last updated"),
+ new Field(TxnStartTimestampKey, INT64, "Time the transaction was started"))
+
+ private val Schemas = Map(0 -> V0)
+
+ val CurrentVersion = 0.toShort
+ val Current = schemaForValue(CurrentVersion)
+
+ val ProducerIdField = V0.get(ProducerIdKey)
+ val ProducerEpochField = V0.get(ProducerEpochKey)
+ val TxnTimeoutField = V0.get(TxnTimeoutKey)
+ val TxnStatusField = V0.get(TxnStatusKey)
+ val TxnPartitionsField = V0.get(TxnPartitionsKey)
+ val TxnEntryTimestampField = V0.get(TxnEntryTimestampKey)
+ val TxnStartTimestampField = V0.get(TxnStartTimestampKey)
+
+ val PartitionsTopicField = PartitionsSchema.get(TopicKey)
+ val PartitionIdsField = PartitionsSchema.get(PartitionIdsKey)
+
+ def ofVersion(version: Int): Option[Schema] = Schemas.get(version)
+ }
private def schemaForKey(version: Int) = {
- val schemaOpt = KEY_SCHEMAS.get(version)
- schemaOpt match {
- case Some(schema) => schema
- case _ => throw new KafkaException(s"Unknown transaction log message key schema version $version")
+ KeySchema.ofVersion(version).getOrElse {
+ throw new KafkaException(s"Unknown transaction log message key schema version $version")
}
}
private def schemaForValue(version: Int) = {
- val schemaOpt = VALUE_SCHEMAS.get(version)
- schemaOpt match {
- case Some(schema) => schema
- case _ => throw new KafkaException(s"Unknown transaction log message value schema version $version")
+ ValueSchema.ofVersion(version).getOrElse {
+ throw new KafkaException(s"Unknown transaction log message value schema version $version")
}
}
@@ -127,11 +131,12 @@ object TransactionLog {
* @return key bytes
*/
private[coordinator] def keyToBytes(transactionalId: String): Array[Byte] = {
- val key = new Struct(CURRENT_KEY_SCHEMA)
- key.set(KEY_SCHEMA_TXN_ID_FIELD, transactionalId)
+ import KeySchema._
+ val key = new Struct(CURRENT)
+ key.set(TXN_ID_FIELD, transactionalId)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
- byteBuffer.putShort(CURRENT_KEY_SCHEMA_VERSION)
+ byteBuffer.putShort(CURRENT_VERSION)
key.writeTo(byteBuffer)
byteBuffer.array()
}
@@ -142,37 +147,38 @@ object TransactionLog {
* @return value payload bytes
*/
private[coordinator] def valueToBytes(txnMetadata: TransactionMetadataTransition): Array[Byte] = {
- val value = new Struct(CURRENT_VALUE_SCHEMA)
- value.set(VALUE_SCHEMA_PID_FIELD, txnMetadata.producerId)
- value.set(VALUE_SCHEMA_EPOCH_FIELD, txnMetadata.producerEpoch)
- value.set(VALUE_SCHEMA_TXN_TIMEOUT_FIELD, txnMetadata.txnTimeoutMs)
- value.set(VALUE_SCHEMA_TXN_STATUS_FIELD, txnMetadata.txnState.byte)
- value.set(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD, txnMetadata.txnLastUpdateTimestamp)
- value.set(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD, txnMetadata.txnStartTimestamp)
+ import ValueSchema._
+ val value = new Struct(Current)
+ value.set(ProducerIdField, txnMetadata.producerId)
+ value.set(ProducerEpochField, txnMetadata.producerEpoch)
+ value.set(TxnTimeoutField, txnMetadata.txnTimeoutMs)
+ value.set(TxnStatusField, txnMetadata.txnState.byte)
+ value.set(TxnEntryTimestampField, txnMetadata.txnLastUpdateTimestamp)
+ value.set(TxnStartTimestampField, txnMetadata.txnStartTimestamp)
if (txnMetadata.txnState == Empty) {
if (txnMetadata.topicPartitions.nonEmpty)
throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
- value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, null)
+ value.set(TxnPartitionsField, null)
} else {
// first group the topic partitions by their topic names
val topicAndPartitions = txnMetadata.topicPartitions.groupBy(_.topic())
val partitionArray = topicAndPartitions.map { case(topic, partitions) =>
- val topicPartitionsStruct = value.instance(VALUE_SCHEMA_TXN_PARTITIONS_FIELD)
+ val topicPartitionsStruct = value.instance(TxnPartitionsField)
val partitionIds: Array[Integer] = partitions.map(topicPartition => Integer.valueOf(topicPartition.partition())).toArray
- topicPartitionsStruct.set(PARTITIONS_SCHEMA_TOPIC_FIELD, topic)
- topicPartitionsStruct.set(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD, partitionIds)
+ topicPartitionsStruct.set(PartitionsTopicField, topic)
+ topicPartitionsStruct.set(PartitionIdsField, partitionIds)
topicPartitionsStruct
}
- value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, partitionArray.toArray)
+ value.set(TxnPartitionsField, partitionArray.toArray)
}
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
- byteBuffer.putShort(CURRENT_VALUE_SCHEMA_VERSION)
+ byteBuffer.putShort(CurrentVersion)
value.writeTo(byteBuffer)
byteBuffer.array()
}
@@ -187,8 +193,8 @@ object TransactionLog {
val keySchema = schemaForKey(version)
val key = keySchema.read(buffer)
- if (version == CURRENT_KEY_SCHEMA_VERSION) {
- val transactionalId = key.getString(KEY_SCHEMA_TXN_ID_FIELD)
+ if (version == KeySchema.CURRENT_VERSION) {
+ val transactionalId = key.getString(KeySchema.TXN_ID_FIELD)
TxnKey(version, transactionalId)
} else {
@@ -197,37 +203,38 @@ object TransactionLog {
}
/**
- * Decodes the transaction log messages' payload and retrieves pid metadata from it
+ * Decodes the transaction log messages' payload and retrieves the transaction metadata from it
*
- * @return a pid metadata object from the message
+ * @return a transaction metadata object from the message
*/
def readMessageValue(buffer: ByteBuffer): TransactionMetadata = {
if (buffer == null) { // tombstone
null
} else {
+ import ValueSchema._
val version = buffer.getShort
val valueSchema = schemaForValue(version)
val value = valueSchema.read(buffer)
- if (version == CURRENT_VALUE_SCHEMA_VERSION) {
- val pid = value.get(VALUE_SCHEMA_PID_FIELD).asInstanceOf[Long]
- val epoch = value.get(VALUE_SCHEMA_EPOCH_FIELD).asInstanceOf[Short]
- val timeout = value.get(VALUE_SCHEMA_TXN_TIMEOUT_FIELD).asInstanceOf[Int]
+ if (version == CurrentVersion) {
+ val producerId = value.getLong(ProducerIdField)
+ val epoch = value.getShort(ProducerEpochField)
+ val timeout = value.getInt(TxnTimeoutField)
- val stateByte = value.getByte(VALUE_SCHEMA_TXN_STATUS_FIELD)
+ val stateByte = value.getByte(TxnStatusField)
val state = TransactionMetadata.byteToState(stateByte)
- val entryTimestamp = value.get(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD).asInstanceOf[Long]
- val startTimestamp = value.get(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD).asInstanceOf[Long]
+ val entryTimestamp = value.getLong(TxnEntryTimestampField)
+ val startTimestamp = value.getLong(TxnStartTimestampField)
- val transactionMetadata = new TransactionMetadata(pid, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)
+ val transactionMetadata = new TransactionMetadata(producerId, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)
if (!state.equals(Empty)) {
- val topicPartitionArray = value.getArray(VALUE_SCHEMA_TXN_PARTITIONS_FIELD)
+ val topicPartitionArray = value.getArray(TxnPartitionsField)
topicPartitionArray.foreach { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
- val topic = memberMetadata.get(PARTITIONS_SCHEMA_TOPIC_FIELD).asInstanceOf[String]
- val partitionIdArray = memberMetadata.getArray(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD)
+ val topic = memberMetadata.getString(PartitionsTopicField)
+ val partitionIdArray = memberMetadata.getArray(PartitionIdsField)
val topicPartitions = partitionIdArray.map { partitionIdObj =>
val partitionId = partitionIdObj.asInstanceOf[Integer]
@@ -252,12 +259,12 @@ object TransactionLog {
case txnKey: TxnKey =>
val transactionalId = txnKey.transactionalId
val value = consumerRecord.value
- val pidMetadata =
+ val producerIdMetadata =
if (value == null) "NULL"
else readMessageValue(ByteBuffer.wrap(value))
output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
- output.write(pidMetadata.toString.getBytes(StandardCharsets.UTF_8))
+ output.write(producerIdMetadata.toString.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
@@ -265,7 +272,7 @@ object TransactionLog {
}
}
-trait BaseKey{
+sealed trait BaseKey {
def version: Short
def transactionalId: Any
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 5978a97..39c7914 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -56,7 +56,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
if (errors == null)
- throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for pid ${txnMarker.producerId}")
+ throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for producer id ${txnMarker.producerId}")
txnStateManager.getTransactionState(transactionalId) match {
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index a76617e..d05676b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -70,9 +70,9 @@ private[transaction] case object CompleteCommit extends TransactionState { val b
private[transaction] case object CompleteAbort extends TransactionState { val byte: Byte = 5 }
private[transaction] object TransactionMetadata {
- def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
+ def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
- def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
+ def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
def byteToState(byte: Byte): TransactionState = {
byte match {
@@ -212,7 +212,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
// metadata transition is valid only if all the following conditions are met:
//
// 1. the new state is already indicated in the pending state.
- // 2. the pid is the same (i.e. this field should never be changed)
+ // 2. the producerId is the same (i.e. this field should never be changed)
// 3. the epoch should be either the same value or old value + 1.
// 4. the last update time is no smaller than the old value.
// 4. the old partitions set is a subset of the new partitions set.
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 1106e7c..cf41fc3 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -104,7 +104,7 @@ class TransactionStateManager(brokerId: Int,
}
def enablePidExpiration() {
- // TODO: add pid expiration logic
+ // TODO: add producer id expiration logic
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f499aa8..a4796d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -114,8 +114,8 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
- * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired
- * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired
+ * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
+ * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*/
@threadsafe
class Log(@volatile var dir: File,
@@ -124,8 +124,8 @@ class Log(@volatile var dir: File,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = Time.SYSTEM,
- val maxPidExpirationMs: Int = 60 * 60 * 1000,
- val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
+ val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+ val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -149,7 +149,7 @@ class Log(@volatile var dir: File,
/* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */
@volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
- private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+ private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -207,7 +207,7 @@ class Log(@volatile var dir: File,
lock synchronized {
producerStateManager.removeExpiredProducers(time.milliseconds)
}
- }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
+ }, period = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
/** The name of this log */
def name = dir.getName()
@@ -306,7 +306,7 @@ class Log(@volatile var dir: File,
}
private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
- val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+ val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
@@ -625,7 +625,7 @@ class Log(@volatile var dir: File,
segment.updateTxnIndex(completedTxn, lastStableOffset)
}
- // always update the last pid map offset so that the snapshot reflects the current offset
+ // always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
@@ -779,8 +779,8 @@ class Log(@volatile var dir: File,
completedTxns: ListBuffer[CompletedTxn],
lastEntry: Option[ProducerIdEntry],
loadingFromLog: Boolean): Unit = {
- val pid = batch.producerId
- val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog))
+ val producerId = batch.producerId
+ val appendInfo = producers.getOrElseUpdate(producerId, new ProducerAppendInfo(producerId, lastEntry, loadingFromLog))
val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers)
maybeCompletedTxn.foreach(completedTxns += _)
@@ -1551,7 +1551,7 @@ object Log {
new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
/**
- * Construct a PID snapshot file using the given offset.
+ * Construct a producer id snapshot file using the given offset.
*
* @param dir The directory in which the log will reside
* @param offset The last offset (exclusive) included in the snapshot
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index af771f1..4ce4716 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -173,7 +173,7 @@ class LogManager(val logDirs: Array[File],
config = config,
logStartOffset = logStartOffset,
recoveryPoint = logRecoveryPoint,
- maxPidExpirationMs = maxPidExpirationMs,
+ maxProducerIdExpirationMs = maxPidExpirationMs,
scheduler = scheduler,
time = time)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
@@ -414,7 +414,7 @@ class LogManager(val logDirs: Array[File],
config = config,
logStartOffset = 0L,
recoveryPoint = 0L,
- maxPidExpirationMs = maxPidExpirationMs,
+ maxProducerIdExpirationMs = maxPidExpirationMs,
scheduler = scheduler,
time = time)
logs.put(topicPartition, log)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d7b1c33..02609b2 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -122,7 +122,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
shouldValidateSequenceNumbers: Boolean): Unit = {
if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
// skip validation if this is the first entry when loading from the log. Log retention
- // will generally have removed the beginning entries from each PID
+ // will generally have removed the beginning entries from each producer id
validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers)
this.producerEpoch = epoch
@@ -303,18 +303,18 @@ object ProducerStateManager {
}
/**
- * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
+ * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g.
* epoch, sequence number, last offset, etc.)
*
* The sequence number is the last number successfully appended to the partition for the given identifier.
* The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
* appended to the partition.
*
- * As long as a PID is contained in the map, the corresponding producer can continue to write data.
- * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from
+ * As long as a producer id is contained in the map, the corresponding producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from
* the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure
- * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to
- * age. This ensures that PIDs will not be expired until either the max expiration time has been reached,
+ * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the max expiration time has been reached,
* or if the topic also is configured for deletion, the segment containing the last written offset has
* been deleted.
*/
@@ -415,7 +415,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs
/**
- * Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
+ * Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
*/
def removeExpiredProducers(currentTimeMs: Long) {
producers.retain { case (producerId, lastEntry) =>
@@ -424,7 +424,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
/**
- * Truncate the PID mapping to the given offset range and reload the entries from the most recent
+ * Truncate the producer id mapping to the given offset range and reload the entries from the most recent
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than
* or equal to the high watermark.
*/
@@ -451,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
*/
def update(appendInfo: ProducerAppendInfo): Unit = {
if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)
- throw new IllegalArgumentException("Invalid PID passed to update")
+ throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update")
val entry = appendInfo.lastEntry
producers.put(appendInfo.producerId, entry)
@@ -465,7 +465,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
/**
- * Get the last written entry for the given PID.
+ * Get the last written entry for the given producer id.
*/
def lastEntry(producerId: Long): Option[ProducerIdEntry] = producers.get(producerId)
@@ -532,7 +532,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
/**
- * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping.
+ * Truncate the producer id mapping and remove all snapshots. This resets the state of the mapping.
*/
def truncate() {
producers.clear()
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5ee4b12..99eddab 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -592,7 +592,7 @@ object KafkaConfig {
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
"If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
- val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache."
+ val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache."
val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
@@ -610,9 +610,9 @@ object KafkaConfig {
val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
/** ********* Transaction Configuration ***********/
val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
- "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " +
- "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " +
- "from the PID is deleted due to the topic's retention settings)."
+ "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
+ "after expiration of this timeout from the last write by the producer id (they may expire sooner if the last write " +
+ "from the producer id is deleted due to the topic's retention settings)."
val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index eab3258..bcf2b58 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -54,36 +54,36 @@ object ConsumerOffsetChecker extends Logging {
}
private def processPartition(zkUtils: ZkUtils,
- group: String, topic: String, pid: Int) {
- val topicPartition = TopicAndPartition(topic, pid)
+ group: String, topic: String, producerId: Int) {
+ val topicPartition = TopicAndPartition(topic, producerId)
val offsetOpt = offsetMap.get(topicPartition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
- val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1
- zkUtils.getLeaderForPartition(topic, pid) match {
+ val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1
+ zkUtils.getLeaderForPartition(topic, producerId) match {
case Some(bid) =>
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
consumerOpt match {
case Some(consumer) =>
- val topicAndPartition = TopicAndPartition(topic, pid)
+ val topicAndPartition = TopicAndPartition(topic, producerId)
val request =
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
- println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
+ println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
owner match {case Some(ownerStr) => ownerStr case None => "none"}))
case None => // ignore
}
case None =>
- println("No broker for partition %s - %s".format(topic, pid))
+ println("No broker for partition %s - %s".format(topic, producerId))
}
}
private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
topicPidMap.get(topic) match {
- case Some(pids) =>
- pids.sorted.foreach {
- pid => processPartition(zkUtils, group, topic, pid)
+ case Some(producerIds) =>
+ producerIds.sorted.foreach {
+ producerId => processPartition(zkUtils, group, topic, producerId)
}
case None => // ignore
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 0b0ad7b..4d35a85 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -136,7 +136,7 @@ object DumpLogSegments {
private def dumpTxnIndex(file: File): Unit = {
val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file)
for (abortedTxn <- index.allAbortedTxns) {
- println(s"version: ${abortedTxn.version} pid: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
+ println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c12f774..fc78501 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -64,7 +64,7 @@ object ZkUtils {
val BrokerSequenceIdPath = s"$BrokersPath/seqid"
val ConfigChangesPath = s"$ConfigPath/changes"
val ConfigUsersPath = s"$ConfigPath/users"
- val ProducerIdBlockPath = "/latest_pid_block"
+ val ProducerIdBlockPath = "/latest_producer_id_block"
// Important: it is necessary to add any new top level Zookeeper path to the Seq
val SecureZkRootPaths = Seq(AdminPath,
BrokersPath,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index aaef466..e545255 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -98,7 +98,7 @@ class LogTest {
LogConfig(logProps),
logStartOffset = 0L,
recoveryPoint = 0L,
- maxPidExpirationMs = 24 * 60,
+ maxProducerIdExpirationMs = 24 * 60,
scheduler = time.scheduler,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
@@ -2436,8 +2436,8 @@ class LogTest {
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time,
- maxPidExpirationMs = maxPidExpirationMs,
- pidExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
+ maxProducerIdExpirationMs = maxPidExpirationMs,
+ producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
log
}