You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/15 18:28:47 UTC

kafka git commit: MINOR: Eliminate PID terminology from non test code

Repository: kafka
Updated Branches:
  refs/heads/trunk e40e27b4e -> 3e6669000


MINOR: Eliminate PID terminology from non test code

Producer id is used instead.

Also refactored TransactionLog schema code to follow
our naming convention and to have better structure.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3041 from ijuma/eliminate-pid-terminology


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e666900
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e666900
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e666900

Branch: refs/heads/trunk
Commit: 3e6669000f082808999a7216b00c4b0f5a94e1da
Parents: e40e27b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 15 11:26:08 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 15 11:26:08 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  10 +-
 .../clients/producer/internals/Sender.java      |  27 +--
 .../producer/internals/TransactionManager.java  |  10 +-
 .../apache/kafka/common/protocol/Errors.java    |   4 +-
 .../kafka/common/record/DefaultRecordBatch.java |   4 +-
 .../common/record/MemoryRecordsBuilder.java     |  10 +-
 .../apache/kafka/common/record/RecordBatch.java |   4 +-
 .../common/requests/AddOffsetsToTxnRequest.java |   6 +-
 .../requests/AddPartitionsToTxnRequest.java     |  12 +-
 .../kafka/common/requests/EndTxnRequest.java    |  12 +-
 .../kafka/common/requests/FetchResponse.java    |   6 +-
 .../common/requests/TxnOffsetCommitRequest.java |  12 +-
 .../common/requests/WriteTxnMarkersRequest.java |  12 +-
 .../requests/WriteTxnMarkersResponse.java       |   6 +-
 .../clients/producer/internals/SenderTest.java  |   4 +-
 .../internals/TransactionManagerTest.java       |   4 +-
 .../kafka/coordinator/group/GroupMetadata.scala |   2 +-
 .../transaction/TransactionLog.scala            | 193 ++++++++++---------
 ...nsactionMarkerRequestCompletionHandler.scala |   2 +-
 .../transaction/TransactionMetadata.scala       |   6 +-
 .../transaction/TransactionStateManager.scala   |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |  22 +--
 core/src/main/scala/kafka/log/LogManager.scala  |   4 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  22 +--
 .../main/scala/kafka/server/KafkaConfig.scala   |   8 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |  20 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   6 +-
 29 files changed, 221 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index cf3736c..d53c19d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -446,19 +446,19 @@ public final class RecordAccumulator {
                                     } else {
                                         ProducerIdAndEpoch producerIdAndEpoch = null;
                                         if (transactionManager != null) {
-                                            producerIdAndEpoch = transactionManager.pidAndEpoch();
+                                            producerIdAndEpoch = transactionManager.producerIdAndEpoch();
                                             if (!producerIdAndEpoch.isValid())
-                                                // we cannot send the batch until we have refreshed the PID
+                                                // we cannot send the batch until we have refreshed the producer id
                                                 break;
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
                                         if (producerIdAndEpoch != null && !batch.inRetry()) {
-                                            // If the batch is in retry, then we should not change the pid and
+                                            // If the batch is in retry, then we should not change the producer id and
                                             // sequence number, since this may introduce duplicates. In particular,
                                             // the previous attempt may actually have been accepted, and if we change
-                                            // the pid and sequence here, this attempt will also be accepted, causing
-                                            // a duplicate.
+                                            // the producer id and sequence here, this attempt will also be accepted,
+                                            // causing a duplicate.
                                             int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
                                             log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
                                                     node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8b96b41..da09a1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -108,7 +108,7 @@ public class Sender implements Runnable {
     /* current request API versions supported by the known brokers */
     private final ApiVersions apiVersions;
 
-    /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */
+    /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
     private final TransactionManager transactionManager;
 
     public Sender(KafkaClient client,
@@ -197,7 +197,7 @@ public class Sender implements Runnable {
 
     private long sendProducerData(long now) {
         Cluster cluster = metadata.fetch();
-        maybeWaitForPid();
+        maybeWaitForProducerId();
 
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
@@ -237,7 +237,7 @@ public class Sender implements Runnable {
         List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
 
         boolean needsTransactionStateReset = false;
-        // Reset the PID if an expired batch has previously been sent to the broker. Also update the metrics
+        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         for (ProducerBatch expiredBatch : expiredBatches) {
@@ -370,8 +370,8 @@ public class Sender implements Runnable {
         return null;
     }
 
-    private void maybeWaitForPid() {
-        // If this is a transactional producer, the PID will be received when recovering transactions in the
+    private void maybeWaitForProducerId() {
+        // If this is a transactional producer, the producer id will be received when recovering transactions in the
         // initTransactions() method of the producer.
         if (transactionManager == null || transactionManager.isTransactional())
             return;
@@ -395,7 +395,7 @@ public class Sender implements Runnable {
                             "We will back off and try again.");
                 }
             } catch (Exception e) {
-                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
+                log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e);
             }
             log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
             time.sleep(retryBackoffMs);
@@ -459,15 +459,16 @@ public class Sender implements Runnable {
                         error);
                 if (transactionManager == null) {
                     reenqueueBatch(batch, now);
-                } else if (transactionManager.pidAndEpoch().producerId == batch.producerId() && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
-                    // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch.
+                } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() &&
+                        transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
+                    // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch.
                     log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
                             transactionManager.sequenceNumber(batch.topicPartition));
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                             "batch but the producer id changed from " + batch.producerId() + " to " +
-                            transactionManager.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
+                            transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."));
                     this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
                 }
             } else {
@@ -476,7 +477,7 @@ public class Sender implements Runnable {
                     exception = new TopicAuthorizationException(batch.topicPartition.topic());
                 else
                     exception = error.exception();
-                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.pidAndEpoch().producerId)
+                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId)
                     log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
                                     "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
                             correlationId, batch.topicPartition, response.baseOffset);
@@ -494,8 +495,8 @@ public class Sender implements Runnable {
         } else {
             completeBatch(batch, response);
 
-            if (transactionManager != null && transactionManager.pidAndEpoch().producerId == batch.producerId()
-                    && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
+            if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId()
+                    && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
                 transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                 log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
                         transactionManager.sequenceNumber(batch.topicPartition));
@@ -519,7 +520,7 @@ public class Sender implements Runnable {
 
     private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
         if (transactionManager != null && !transactionManager.isTransactional()
-                && batch.producerId() == transactionManager.pidAndEpoch().producerId) {
+                && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) {
             // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
             // about the previously committed message. Note that this will discard the producer id and sequence
             // numbers for all existing partitions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 566ad7c..7e2f813 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -114,7 +114,7 @@ public class TransactionManager {
 
 
     // We use the priority to determine the order in which requests need to be sent out. For instance, if we have
-    // a pending FindCoordinator request, that must always go first. Next, If we need a PID, that must go second.
+    // a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second.
     // The endTxn request must always go last.
     private enum Priority {
         FIND_COORDINATOR(0),
@@ -262,17 +262,17 @@ public class TransactionManager {
     }
 
     /**
-     * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
+     * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
      *
      * @return the current ProducerIdAndEpoch.
      */
-    ProducerIdAndEpoch pidAndEpoch() {
+    ProducerIdAndEpoch producerIdAndEpoch() {
         return producerIdAndEpoch;
     }
 
     /**
-     * Set the pid and epoch atomically.
+     * Set the producer id and epoch atomically.
      */
     void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
         this.producerIdAndEpoch = producerIdAndEpoch;
@@ -291,7 +291,7 @@ public class TransactionManager {
      * messages will return an OutOfOrderSequenceException.
      *
      * Note that we can't reset the producer state for the transactional producer as this would mean bumping the epoch
-     * for the same pid. This might involve aborting the ongoing transaction during the initPidRequest, and the user
+     * for the same producer id. This might involve aborting the ongoing transaction during the initPidRequest, and the user
      * would not have any way of knowing this happened. So for the transactional producer, it's best to return the
      * produce error to the user and let them abort the transaction and close the producer explicitly.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 58a0a2a..a0922cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -439,8 +439,8 @@ public enum Errors {
                 return new InvalidTxnStateException(message);
             }
         }),
-    INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " +
-            "its transactionalId",
+    INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer id which is not currently assigned to " +
+            "its transactional id",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index f321c3b..74bd3c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -361,7 +361,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
                             TimestampType timestampType,
                             long baseTimestamp,
                             long maxTimestamp,
-                            long pid,
+                            long producerId,
                             short epoch,
                             int sequence,
                             boolean isTransactional,
@@ -384,7 +384,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
         buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
         buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
-        buffer.putLong(position + PRODUCER_ID_OFFSET, pid);
+        buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
         buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
         buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
         buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 025b402..e52df76 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -95,7 +95,7 @@ public class MemoryRecordsBuilder {
      * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
      * @param baseOffset The initial offset to use for
      * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
-     * @param producerId The producer ID (PID) associated with the producer writing this record set
+     * @param producerId The producer ID associated with the producer writing this record set
      * @param producerEpoch The epoch of the producer
      * @param baseSequence The sequence number of the first record in this set
      * @param isTransactional Whether or not the records are part of a transaction
@@ -212,15 +212,15 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    public void setProducerState(long pid, short epoch, int baseSequence) {
+    public void setProducerState(long producerId, short epoch, int baseSequence) {
         if (isClosed()) {
             // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
             // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
-            // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence
+            // be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence
             // once a batch has been sent to the broker risks introducing duplicates.
             throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
         }
-        this.producerId = pid;
+        this.producerId = producerId;
         this.producerEpoch = epoch;
         this.baseSequence = baseSequence;
     }
@@ -691,7 +691,7 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Return the ProducerId (PID) of the RecordBatches created by this builder.
+     * Return the producer id of the RecordBatches created by this builder.
      */
     public long producerId() {
         return this.producerId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index c984deb..42b0c2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -132,9 +132,9 @@ public interface RecordBatch extends Iterable<Record> {
     byte magic();
 
     /**
-     * Get the producer ID (PID) for this log record batch. For older magic versions, this will return 0.
+     * Get the producer id for this log record batch. For older magic versions, this will return 0.
      *
-     * @return The PID or -1 if there is none
+     * @return The producer id or -1 if there is none
      */
     long producerId();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index b017242..4bf8b3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 
 public class AddOffsetsToTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
 
@@ -68,7 +68,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
     public AddOffsetsToTxnRequest(Struct struct, short version) {
         super(version);
         this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PID_KEY_NAME);
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
         this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
     }
@@ -93,7 +93,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
         struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PID_KEY_NAME, producerId);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(EPOCH_KEY_NAME, producerEpoch);
         struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 5bbea61..69ae25c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -29,8 +29,8 @@ import java.util.Map;
 
 public class AddPartitionsToTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -72,8 +72,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     public AddPartitionsToTxnRequest(Struct struct, short version) {
         super(version);
         this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PID_KEY_NAME);
-        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
 
         List<TopicPartition> partitions = new ArrayList<>();
         Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@@ -107,8 +107,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
         struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
 
         Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions);
         Object[] partitionsArray = new Object[mappedPartitions.size()];

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 9c215be..ff9b82c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -24,8 +24,8 @@ import java.nio.ByteBuffer;
 
 public class EndTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String PID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
 
     public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
@@ -64,8 +64,8 @@ public class EndTxnRequest extends AbstractRequest {
     public EndTxnRequest(Struct struct, short version) {
         super(version);
         this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.producerId = struct.getLong(PID_KEY_NAME);
-        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
         this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
     }
 
@@ -89,8 +89,8 @@ public class EndTxnRequest extends AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
         struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(PID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
         struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index db12d26..0cb87b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse {
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
     // aborted transaction field names
-    private static final String PID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
 
     private static final int DEFAULT_THROTTLE_TIME = 0;
@@ -211,7 +211,7 @@ public class FetchResponse extends AbstractResponse {
                         abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
                         for (Object abortedTransactionObj : abortedTransactionsArray) {
                             Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME);
+                            long producerId = abortedTransactionStruct.getLong(PRODUCER_ID_KEY_NAME);
                             long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
                             abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
                         }
@@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse {
                         List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
                         for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
                             Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
-                            abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId);
+                            abortedTransactionStruct.set(PRODUCER_ID_KEY_NAME, abortedTransaction.producerId);
                             abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
                             abortedTransactionStructs.add(abortedTransactionStruct);
                         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 8778b49..3f3024f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -28,8 +28,8 @@ import java.util.Map;
 
 public class TxnOffsetCommitRequest extends AbstractRequest {
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
-    private static final String PID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String RETENTION_TIME_KEY_NAME = "retention_time";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String TOPIC_KEY_NAME = "topic";
@@ -84,8 +84,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     public TxnOffsetCommitRequest(Struct struct, short version) {
         super(version);
         this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
-        this.producerId = struct.getLong(PID_KEY_NAME);
-        this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
         this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
 
         Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
@@ -128,8 +128,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
         struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
-        struct.set(PID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, producerEpoch);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
         struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
 
         Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 0c09880..cf2c9fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -33,8 +33,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
     private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
 
-    private static final String PID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String TOPIC_KEY_NAME = "topic";
@@ -138,8 +138,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         for (Object markerObj : markersArray) {
             Struct markerStruct = (Struct) markerObj;
 
-            long producerId = markerStruct.getLong(PID_KEY_NAME);
-            short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+            long producerId = markerStruct.getLong(PRODUCER_ID_KEY_NAME);
+            short producerEpoch = markerStruct.getShort(PRODUCER_EPOCH_KEY_NAME);
             int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
             TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
 
@@ -172,8 +172,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         int i = 0;
         for (TxnMarkerEntry entry : markers) {
             Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
-            markerStruct.set(PID_KEY_NAME, entry.producerId);
-            markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+            markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId);
+            markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch);
             markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
             markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 00133a6..06f6662 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -29,7 +29,7 @@ import java.util.Map;
 public class WriteTxnMarkersResponse extends AbstractResponse {
     private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
 
-    private static final String PID_KEY_NAME = "producer_id";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
     private static final String TOPIC_KEY_NAME = "topic";
@@ -62,7 +62,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         for (Object responseObj : responseArray) {
             Struct responseStruct = (Struct) responseObj;
 
-            long producerId = responseStruct.getLong(PID_KEY_NAME);
+            long producerId = responseStruct.getLong(PRODUCER_ID_KEY_NAME);
 
             Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
             Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@@ -90,7 +90,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
         int k = 0;
         for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
             Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
-            responseStruct.set(PID_KEY_NAME, responseEntry.getKey());
+            responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey());
 
             Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
             Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(partitionAndErrors);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index bb13dcb..1321fba 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -387,8 +387,8 @@ public class SenderTest {
         }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
         sender.run(time.milliseconds());
         assertTrue(transactionManager.hasProducerId());
-        assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
-        assertEquals((short) 0, transactionManager.pidAndEpoch().epoch);
+        assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
+        assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53686e2..c0acfec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -286,8 +286,8 @@ public class TransactionManagerTest {
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
         assertTrue(transactionManager.hasProducerId());
-        assertEquals(pid, transactionManager.pidAndEpoch().producerId);
-        assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
+        assertEquals(pid, transactionManager.producerIdAndEpoch().producerId);
+        assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 8122694..2f76d63 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -146,7 +146,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   private val members = new mutable.HashMap[String, MemberMetadata]
   private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
   private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-  // A map from a PID to the open offset commits for that pid.
+  // A map from a producer id to the open offset commits for that producer id.
   private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]()
   private var receivedTransactionalOffsetCommits = false
   private var receivedConsumerOffsetCommits = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index a180502..d0c9e87 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -30,12 +30,12 @@ import org.apache.kafka.common.record.CompressionType
 import scala.collection.mutable
 
 /*
- * Messages stored for the transaction topic represent the pid and transactional status of the corresponding
+ * Messages stored for the transaction topic represent the producer id and transactional status of the corresponding
  * transactional id, which have versions for both the key and value fields. Key and value
  * versions are used to evolve the message formats:
  *
  * key version 0:               [transactionalId]
- *    -> value version 0:       [pid, epoch, expire_timestamp, status, [topic [partition], timestamp ]
+ *    -> value version 0:       [producer_id, producer_epoch, expire_timestamp, status, [topic [partition], timestamp]
  */
 object TransactionLog {
 
@@ -55,69 +55,73 @@ object TransactionLog {
   val EnforcedRequiredAcks: Short = (-1).toShort
 
   // log message formats
-  private val TXN_ID_KEY = "transactional_id"
-
-  private val PID_KEY = "pid"
-  private val EPOCH_KEY = "epoch"
-  private val TXN_TIMEOUT_KEY = "transaction_timeout"
-  private val TXN_STATUS_KEY = "transaction_status"
-  private val TXN_PARTITIONS_KEY = "transaction_partitions"
-  private val TXN_ENTRY_TIMESTAMP_FIELD = "transaction_entry_timestamp"
-  private val TXN_START_TIMESTAMP_FIELD = "transaction_start_timestamp"
-  private val TOPIC_KEY = "topic"
-  private val PARTITION_IDS_KEY = "partition_ids"
-
-  private val KEY_SCHEMA_V0 = new Schema(new Field(TXN_ID_KEY, STRING))
-  private val KEY_SCHEMA_TXN_ID_FIELD = KEY_SCHEMA_V0.get(TXN_ID_KEY)
-
-  private val VALUE_PARTITIONS_SCHEMA = new Schema(new Field(TOPIC_KEY, STRING),
-                                                   new Field(PARTITION_IDS_KEY, new ArrayOf(INT32)))
-  private val PARTITIONS_SCHEMA_TOPIC_FIELD = VALUE_PARTITIONS_SCHEMA.get(TOPIC_KEY)
-  private val PARTITIONS_SCHEMA_PARTITION_IDS_FIELD = VALUE_PARTITIONS_SCHEMA.get(PARTITION_IDS_KEY)
-
-  private val VALUE_SCHEMA_V0 = new Schema(new Field(PID_KEY, INT64, "Producer id in use by the transactional id."),
-                                           new Field(EPOCH_KEY, INT16, "Epoch associated with the producer id"),
-                                           new Field(TXN_TIMEOUT_KEY, INT32, "Transaction timeout in milliseconds"),
-                                           new Field(TXN_STATUS_KEY, INT8,
-                                             "TransactionState the transaction is in"),
-                                           new Field(TXN_PARTITIONS_KEY, ArrayOf.nullable(VALUE_PARTITIONS_SCHEMA),
-                                            "Set of partitions involved in the transaction"),
-                                           new Field(TXN_ENTRY_TIMESTAMP_FIELD, INT64, "Time the transaction was last updated"),
-                                           new Field(TXN_START_TIMESTAMP_FIELD, INT64, "Time the transaction was started"))
-  private val VALUE_SCHEMA_PID_FIELD = VALUE_SCHEMA_V0.get(PID_KEY)
-  private val VALUE_SCHEMA_EPOCH_FIELD = VALUE_SCHEMA_V0.get(EPOCH_KEY)
-  private val VALUE_SCHEMA_TXN_TIMEOUT_FIELD = VALUE_SCHEMA_V0.get(TXN_TIMEOUT_KEY)
-  private val VALUE_SCHEMA_TXN_STATUS_FIELD = VALUE_SCHEMA_V0.get(TXN_STATUS_KEY)
-  private val VALUE_SCHEMA_TXN_PARTITIONS_FIELD = VALUE_SCHEMA_V0.get(TXN_PARTITIONS_KEY)
-  private val VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_ENTRY_TIMESTAMP_FIELD)
-  private val VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD = VALUE_SCHEMA_V0.get(TXN_START_TIMESTAMP_FIELD)
-
-  private val KEY_SCHEMAS = Map(
-    0 -> KEY_SCHEMA_V0)
-
-  private val VALUE_SCHEMAS = Map(
-    0 -> VALUE_SCHEMA_V0)
-
-  private val CURRENT_KEY_SCHEMA_VERSION = 0.toShort
-  private val CURRENT_VALUE_SCHEMA_VERSION = 0.toShort
-
-  private val CURRENT_KEY_SCHEMA = schemaForKey(CURRENT_KEY_SCHEMA_VERSION)
-
-  private val CURRENT_VALUE_SCHEMA = schemaForValue(CURRENT_VALUE_SCHEMA_VERSION)
+
+  private object KeySchema {
+    private val TXN_ID_KEY = "transactional_id"
+
+    private val V0 = new Schema(new Field(TXN_ID_KEY, STRING))
+    private val SCHEMAS = Map(0 -> V0)
+
+    val CURRENT_VERSION = 0.toShort
+    val CURRENT = schemaForKey(CURRENT_VERSION)
+
+    val TXN_ID_FIELD = V0.get(TXN_ID_KEY)
+
+    def ofVersion(version: Int): Option[Schema] = SCHEMAS.get(version)
+  }
+
+  private object ValueSchema {
+    private val ProducerIdKey = "producer_id"
+    private val ProducerEpochKey = "producer_epoch"
+    private val TxnTimeoutKey = "transaction_timeout"
+    private val TxnStatusKey = "transaction_status"
+    private val TxnPartitionsKey = "transaction_partitions"
+    private val TxnEntryTimestampKey = "transaction_entry_timestamp"
+    private val TxnStartTimestampKey = "transaction_start_timestamp"
+
+    private val PartitionIdsKey = "partition_ids"
+    private val TopicKey = "topic"
+    private val PartitionsSchema = new Schema(new Field(TopicKey, STRING),
+      new Field(PartitionIdsKey, new ArrayOf(INT32)))
+
+    private val V0 = new Schema(new Field(ProducerIdKey, INT64, "Producer id in use by the transactional id."),
+      new Field(ProducerEpochKey, INT16, "Epoch associated with the producer id"),
+      new Field(TxnTimeoutKey, INT32, "Transaction timeout in milliseconds"),
+      new Field(TxnStatusKey, INT8,
+        "TransactionState the transaction is in"),
+      new Field(TxnPartitionsKey, ArrayOf.nullable(PartitionsSchema),
+        "Set of partitions involved in the transaction"),
+      new Field(TxnEntryTimestampKey, INT64, "Time the transaction was last updated"),
+      new Field(TxnStartTimestampKey, INT64, "Time the transaction was started"))
+
+    private val Schemas = Map(0 -> V0)
+
+    val CurrentVersion = 0.toShort
+    val Current = schemaForValue(CurrentVersion)
+
+    val ProducerIdField = V0.get(ProducerIdKey)
+    val ProducerEpochField = V0.get(ProducerEpochKey)
+    val TxnTimeoutField = V0.get(TxnTimeoutKey)
+    val TxnStatusField = V0.get(TxnStatusKey)
+    val TxnPartitionsField = V0.get(TxnPartitionsKey)
+    val TxnEntryTimestampField = V0.get(TxnEntryTimestampKey)
+    val TxnStartTimestampField = V0.get(TxnStartTimestampKey)
+
+    val PartitionsTopicField = PartitionsSchema.get(TopicKey)
+    val PartitionIdsField = PartitionsSchema.get(PartitionIdsKey)
+
+    def ofVersion(version: Int): Option[Schema] = Schemas.get(version)
+  }
 
   private def schemaForKey(version: Int) = {
-    val schemaOpt = KEY_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException(s"Unknown transaction log message key schema version $version")
+    KeySchema.ofVersion(version).getOrElse {
+      throw new KafkaException(s"Unknown transaction log message key schema version $version")
     }
   }
 
   private def schemaForValue(version: Int) = {
-    val schemaOpt = VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException(s"Unknown transaction log message value schema version $version")
+    ValueSchema.ofVersion(version).getOrElse {
+      throw new KafkaException(s"Unknown transaction log message value schema version $version")
     }
   }
 
@@ -127,11 +131,12 @@ object TransactionLog {
     * @return key bytes
     */
   private[coordinator] def keyToBytes(transactionalId: String): Array[Byte] = {
-    val key = new Struct(CURRENT_KEY_SCHEMA)
-    key.set(KEY_SCHEMA_TXN_ID_FIELD, transactionalId)
+    import KeySchema._
+    val key = new Struct(CURRENT)
+    key.set(TXN_ID_FIELD, transactionalId)
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
-    byteBuffer.putShort(CURRENT_KEY_SCHEMA_VERSION)
+    byteBuffer.putShort(CURRENT_VERSION)
     key.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -142,37 +147,38 @@ object TransactionLog {
     * @return value payload bytes
     */
   private[coordinator] def valueToBytes(txnMetadata: TransactionMetadataTransition): Array[Byte] = {
-    val value = new Struct(CURRENT_VALUE_SCHEMA)
-    value.set(VALUE_SCHEMA_PID_FIELD, txnMetadata.producerId)
-    value.set(VALUE_SCHEMA_EPOCH_FIELD, txnMetadata.producerEpoch)
-    value.set(VALUE_SCHEMA_TXN_TIMEOUT_FIELD, txnMetadata.txnTimeoutMs)
-    value.set(VALUE_SCHEMA_TXN_STATUS_FIELD, txnMetadata.txnState.byte)
-    value.set(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD, txnMetadata.txnLastUpdateTimestamp)
-    value.set(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD, txnMetadata.txnStartTimestamp)
+    import ValueSchema._
+    val value = new Struct(Current)
+    value.set(ProducerIdField, txnMetadata.producerId)
+    value.set(ProducerEpochField, txnMetadata.producerEpoch)
+    value.set(TxnTimeoutField, txnMetadata.txnTimeoutMs)
+    value.set(TxnStatusField, txnMetadata.txnState.byte)
+    value.set(TxnEntryTimestampField, txnMetadata.txnLastUpdateTimestamp)
+    value.set(TxnStartTimestampField, txnMetadata.txnStartTimestamp)
 
     if (txnMetadata.txnState == Empty) {
       if (txnMetadata.topicPartitions.nonEmpty)
         throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
 
-      value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, null)
+      value.set(TxnPartitionsField, null)
     } else {
       // first group the topic partitions by their topic names
       val topicAndPartitions = txnMetadata.topicPartitions.groupBy(_.topic())
 
       val partitionArray = topicAndPartitions.map { case(topic, partitions) =>
-        val topicPartitionsStruct = value.instance(VALUE_SCHEMA_TXN_PARTITIONS_FIELD)
+        val topicPartitionsStruct = value.instance(TxnPartitionsField)
         val partitionIds: Array[Integer] = partitions.map(topicPartition => Integer.valueOf(topicPartition.partition())).toArray
 
-        topicPartitionsStruct.set(PARTITIONS_SCHEMA_TOPIC_FIELD, topic)
-        topicPartitionsStruct.set(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD, partitionIds)
+        topicPartitionsStruct.set(PartitionsTopicField, topic)
+        topicPartitionsStruct.set(PartitionIdsField, partitionIds)
 
         topicPartitionsStruct
       }
-      value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, partitionArray.toArray)
+      value.set(TxnPartitionsField, partitionArray.toArray)
     }
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_VALUE_SCHEMA_VERSION)
+    byteBuffer.putShort(CurrentVersion)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -187,8 +193,8 @@ object TransactionLog {
     val keySchema = schemaForKey(version)
     val key = keySchema.read(buffer)
 
-    if (version == CURRENT_KEY_SCHEMA_VERSION) {
-      val transactionalId = key.getString(KEY_SCHEMA_TXN_ID_FIELD)
+    if (version == KeySchema.CURRENT_VERSION) {
+      val transactionalId = key.getString(KeySchema.TXN_ID_FIELD)
 
       TxnKey(version, transactionalId)
     } else {
@@ -197,37 +203,38 @@ object TransactionLog {
   }
 
   /**
-    * Decodes the transaction log messages' payload and retrieves pid metadata from it
+    * Decodes the transaction log messages' payload and retrieves the transaction metadata from it
     *
-    * @return a pid metadata object from the message
+    * @return a transaction metadata object from the message
     */
   def readMessageValue(buffer: ByteBuffer): TransactionMetadata = {
     if (buffer == null) { // tombstone
       null
     } else {
+      import ValueSchema._
       val version = buffer.getShort
       val valueSchema = schemaForValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version == CURRENT_VALUE_SCHEMA_VERSION) {
-        val pid = value.get(VALUE_SCHEMA_PID_FIELD).asInstanceOf[Long]
-        val epoch = value.get(VALUE_SCHEMA_EPOCH_FIELD).asInstanceOf[Short]
-        val timeout = value.get(VALUE_SCHEMA_TXN_TIMEOUT_FIELD).asInstanceOf[Int]
+      if (version == CurrentVersion) {
+        val producerId = value.getLong(ProducerIdField)
+        val epoch = value.getShort(ProducerEpochField)
+        val timeout = value.getInt(TxnTimeoutField)
 
-        val stateByte = value.getByte(VALUE_SCHEMA_TXN_STATUS_FIELD)
+        val stateByte = value.getByte(TxnStatusField)
         val state = TransactionMetadata.byteToState(stateByte)
-        val entryTimestamp = value.get(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD).asInstanceOf[Long]
-        val startTimestamp = value.get(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD).asInstanceOf[Long]
+        val entryTimestamp = value.getLong(TxnEntryTimestampField)
+        val startTimestamp = value.getLong(TxnStartTimestampField)
 
-        val transactionMetadata = new TransactionMetadata(pid, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)
+        val transactionMetadata = new TransactionMetadata(producerId, epoch, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)
 
         if (!state.equals(Empty)) {
-          val topicPartitionArray = value.getArray(VALUE_SCHEMA_TXN_PARTITIONS_FIELD)
+          val topicPartitionArray = value.getArray(TxnPartitionsField)
 
           topicPartitionArray.foreach { memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
-            val topic = memberMetadata.get(PARTITIONS_SCHEMA_TOPIC_FIELD).asInstanceOf[String]
-            val partitionIdArray = memberMetadata.getArray(PARTITIONS_SCHEMA_PARTITION_IDS_FIELD)
+            val topic = memberMetadata.getString(PartitionsTopicField)
+            val partitionIdArray = memberMetadata.getArray(PartitionIdsField)
 
             val topicPartitions = partitionIdArray.map { partitionIdObj =>
               val partitionId = partitionIdObj.asInstanceOf[Integer]
@@ -252,12 +259,12 @@ object TransactionLog {
         case txnKey: TxnKey =>
           val transactionalId = txnKey.transactionalId
           val value = consumerRecord.value
-          val pidMetadata =
+          val producerIdMetadata =
             if (value == null) "NULL"
             else readMessageValue(ByteBuffer.wrap(value))
           output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
           output.write("::".getBytes(StandardCharsets.UTF_8))
-          output.write(pidMetadata.toString.getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.toString.getBytes(StandardCharsets.UTF_8))
           output.write("\n".getBytes(StandardCharsets.UTF_8))
         case _ => // no-op
       }
@@ -265,7 +272,7 @@ object TransactionLog {
   }
 }
 
-trait BaseKey{
+sealed trait BaseKey {
   def version: Short
   def transactionalId: Any
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 5978a97..39c7914 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -56,7 +56,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
         val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
 
         if (errors == null)
-          throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for pid ${txnMarker.producerId}")
+          throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for producer id ${txnMarker.producerId}")
 
         txnStateManager.getTransactionState(transactionalId) match {
           case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index a76617e..d05676b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -70,9 +70,9 @@ private[transaction] case object CompleteCommit extends TransactionState { val b
 private[transaction] case object CompleteAbort extends TransactionState { val byte: Byte = 5 }
 
 private[transaction] object TransactionMetadata {
-  def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
+  def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, Empty, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
 
-  def apply(pid: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(pid, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
+  def apply(producerId: Long, epoch: Short, txnTimeoutMs: Int, state: TransactionState, timestamp: Long) = new TransactionMetadata(producerId, epoch, txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)
 
   def byteToState(byte: Byte): TransactionState = {
     byte match {
@@ -212,7 +212,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
     // metadata transition is valid only if all the following conditions are met:
     //
     // 1. the new state is already indicated in the pending state.
-    // 2. the pid is the same (i.e. this field should never be changed)
+    // 2. the producerId is the same (i.e. this field should never be changed)
     // 3. the epoch should be either the same value or old value + 1.
     // 4. the last update time is no smaller than the old value.
     // 4. the old partitions set is a subset of the new partitions set.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 1106e7c..cf41fc3 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -104,7 +104,7 @@ class TransactionStateManager(brokerId: Int,
   }
 
   def enablePidExpiration() {
-    // TODO: add pid expiration logic
+    // TODO: add producer id expiration logic
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f499aa8..a4796d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -114,8 +114,8 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
  * @param time The time instance used for checking the clock
- * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired
- * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired
+ * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
+ * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
  */
 @threadsafe
 class Log(@volatile var dir: File,
@@ -124,8 +124,8 @@ class Log(@volatile var dir: File,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
           time: Time = Time.SYSTEM,
-          val maxPidExpirationMs: Int = 60 * 60 * 1000,
-          val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
+          val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+          val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -149,7 +149,7 @@ class Log(@volatile var dir: File,
   /* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */
   @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
 
-  private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+  private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
 
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -207,7 +207,7 @@ class Log(@volatile var dir: File,
     lock synchronized {
       producerStateManager.removeExpiredProducers(time.milliseconds)
     }
-  }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
+  }, period = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
 
   /** The name of this log */
   def name  = dir.getName()
@@ -306,7 +306,7 @@ class Log(@volatile var dir: File,
   }
 
   private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
-    val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+    val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
     logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
       val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
@@ -625,7 +625,7 @@ class Log(@volatile var dir: File,
           segment.updateTxnIndex(completedTxn, lastStableOffset)
         }
 
-        // always update the last pid map offset so that the snapshot reflects the current offset
+        // always update the last producer id map offset so that the snapshot reflects the current offset
         // even if there isn't any idempotent data being written
         producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
 
@@ -779,8 +779,8 @@ class Log(@volatile var dir: File,
                               completedTxns: ListBuffer[CompletedTxn],
                               lastEntry: Option[ProducerIdEntry],
                               loadingFromLog: Boolean): Unit = {
-    val pid = batch.producerId
-    val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog))
+    val producerId = batch.producerId
+    val appendInfo = producers.getOrElseUpdate(producerId, new ProducerAppendInfo(producerId, lastEntry, loadingFromLog))
     val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
     val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers)
     maybeCompletedTxn.foreach(completedTxns += _)
@@ -1551,7 +1551,7 @@ object Log {
     new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
 
   /**
-   * Construct a PID snapshot file using the given offset.
+   * Construct a producer id snapshot file using the given offset.
    *
    * @param dir The directory in which the log will reside
    * @param offset The last offset (exclusive) included in the snapshot

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index af771f1..4ce4716 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -173,7 +173,7 @@ class LogManager(val logDirs: Array[File],
             config = config,
             logStartOffset = logStartOffset,
             recoveryPoint = logRecoveryPoint,
-            maxPidExpirationMs = maxPidExpirationMs,
+            maxProducerIdExpirationMs = maxPidExpirationMs,
             scheduler = scheduler,
             time = time)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
@@ -414,7 +414,7 @@ class LogManager(val logDirs: Array[File],
           config = config,
           logStartOffset = 0L,
           recoveryPoint = 0L,
-          maxPidExpirationMs = maxPidExpirationMs,
+          maxProducerIdExpirationMs = maxPidExpirationMs,
           scheduler = scheduler,
           time = time)
         logs.put(topicPartition, log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d7b1c33..02609b2 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -122,7 +122,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
              shouldValidateSequenceNumbers: Boolean): Unit = {
     if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
       // skip validation if this is the first entry when loading from the log. Log retention
-      // will generally have removed the beginning entries from each PID
+      // will generally have removed the beginning entries from each producer id
       validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers)
 
     this.producerEpoch = epoch
@@ -303,18 +303,18 @@ object ProducerStateManager {
 }
 
 /**
- * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
+ * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g.
  * epoch, sequence number, last offset, etc.)
  *
  * The sequence number is the last number successfully appended to the partition for the given identifier.
  * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
  * appended to the partition.
  *
- * As long as a PID is contained in the map, the corresponding producer can continue to write data.
- * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from
+ * As long as a producer id is contained in the map, the corresponding producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from
  * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure
- * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to
- * age. This ensures that PIDs will not be expired until either the max expiration time has been reached,
+ * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the max expiration time has been reached,
  * or if the topic also is configured for deletion, the segment containing the last written offset has
  * been deleted.
  */
@@ -415,7 +415,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
     producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs
 
   /**
-   * Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
+   * Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
    */
   def removeExpiredProducers(currentTimeMs: Long) {
     producers.retain { case (producerId, lastEntry) =>
@@ -424,7 +424,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   /**
-   * Truncate the PID mapping to the given offset range and reload the entries from the most recent
+   * Truncate the producer id mapping to the given offset range and reload the entries from the most recent
    * snapshot in range (if there is one). Note that the log end offset is assumed to be less than
    * or equal to the high watermark.
    */
@@ -451,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    */
   def update(appendInfo: ProducerAppendInfo): Unit = {
     if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)
-      throw new IllegalArgumentException("Invalid PID passed to update")
+      throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update")
 
     val entry = appendInfo.lastEntry
     producers.put(appendInfo.producerId, entry)
@@ -465,7 +465,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   /**
-   * Get the last written entry for the given PID.
+   * Get the last written entry for the given producer id.
    */
   def lastEntry(producerId: Long): Option[ProducerIdEntry] = producers.get(producerId)
 
@@ -532,7 +532,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   /**
-   * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping.
+   * Truncate the producer id mapping and remove all snapshots. This resets the state of the mapping.
    */
   def truncate() {
     producers.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5ee4b12..99eddab 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -592,7 +592,7 @@ object KafkaConfig {
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
     "If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
-  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache."
+  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
     "Internal topic creation will fail until the cluster size meets this replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
@@ -610,9 +610,9 @@ object KafkaConfig {
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
   /** ********* Transaction Configuration ***********/
   val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
-    "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " +
-    "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " +
-    "from the PID is deleted due to the topic's retention settings)."
+    "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
+    "after expiration of this timeout from the last write by the producer id (they may expire sooner if the last write " +
+    "from the producer id is deleted due to the topic's retention settings)."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index eab3258..bcf2b58 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -54,36 +54,36 @@ object ConsumerOffsetChecker extends Logging {
   }
 
   private def processPartition(zkUtils: ZkUtils,
-                               group: String, topic: String, pid: Int) {
-    val topicPartition = TopicAndPartition(topic, pid)
+                               group: String, topic: String, producerId: Int) {
+    val topicPartition = TopicAndPartition(topic, producerId)
     val offsetOpt = offsetMap.get(topicPartition)
     val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1
-    zkUtils.getLeaderForPartition(topic, pid) match {
+    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1
+    zkUtils.getLeaderForPartition(topic, producerId) match {
       case Some(bid) =>
         val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
         consumerOpt match {
           case Some(consumer) =>
-            val topicAndPartition = TopicAndPartition(topic, pid)
+            val topicAndPartition = TopicAndPartition(topic, producerId)
             val request =
               OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
             val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
 
             val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
-            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
+            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
                                                                    owner match {case Some(ownerStr) => ownerStr case None => "none"}))
           case None => // ignore
         }
       case None =>
-        println("No broker for partition %s - %s".format(topic, pid))
+        println("No broker for partition %s - %s".format(topic, producerId))
     }
   }
 
   private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
     topicPidMap.get(topic) match {
-      case Some(pids) =>
-        pids.sorted.foreach {
-          pid => processPartition(zkUtils, group, topic, pid)
+      case Some(producerIds) =>
+        producerIds.sorted.foreach {
+          producerId => processPartition(zkUtils, group, topic, producerId)
         }
       case None => // ignore
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 0b0ad7b..4d35a85 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -136,7 +136,7 @@ object DumpLogSegments {
   private def dumpTxnIndex(file: File): Unit = {
     val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file)
     for (abortedTxn <- index.allAbortedTxns) {
-      println(s"version: ${abortedTxn.version} pid: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
+      println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
         s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c12f774..fc78501 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -64,7 +64,7 @@ object ZkUtils {
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
-  val ProducerIdBlockPath = "/latest_pid_block"
+  val ProducerIdBlockPath = "/latest_producer_id_block"
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
   val SecureZkRootPaths = Seq(AdminPath,
                               BrokersPath,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e666900/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index aaef466..e545255 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -98,7 +98,7 @@ class LogTest {
                       LogConfig(logProps),
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
-                      maxPidExpirationMs = 24 * 60,
+                      maxProducerIdExpirationMs = 24 * 60,
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
@@ -2436,8 +2436,8 @@ class LogTest {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time,
-      maxPidExpirationMs = maxPidExpirationMs,
-      pidExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
+      maxProducerIdExpirationMs = maxPidExpirationMs,
+      producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
     log
   }