You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/13 03:02:22 UTC

[1/2] kafka git commit: MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse

Repository: kafka
Updated Branches:
  refs/heads/trunk 1cb39f757 -> a1c8e7d94


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 4b2cedb..d7b1c33 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
-    this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
+  def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
+    this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
 
-  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = {
-    if (this.producerEpoch > epoch) {
+  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = {
+    if (this.producerEpoch > producerEpoch) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
-        s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)")
+        s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)")
     } else if (shouldValidateSequenceNumbers) {
-      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < epoch) {
+      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < producerEpoch) {
         if (firstSeq != 0)
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
             s"(request epoch), $firstSeq (seq. number)")
       } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence number to be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), found $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
           s"(incoming seq. number), but expected 0")
       } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-        throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId, (incomingBatch.firstSeq, " +
+        throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
           s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " +
           s"(${this.firstSeq}, ${this.lastSeq}).")
       } else if (firstSeq != this.lastSeq + 1L) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
           s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
       }
     }
@@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
 }
 
 object ProducerStateManager {
-  private val PidSnapshotVersion: Short = 1
+  private val ProducerSnapshotVersion: Short = 1
   private val VersionField = "version"
   private val CrcField = "crc"
-  private val PidField = "pid"
+  private val ProducerIdField = "producer_id"
   private val LastSequenceField = "last_sequence"
   private val ProducerEpochField = "epoch"
   private val LastOffsetField = "last_offset"
   private val OffsetDeltaField = "offset_delta"
   private val TimestampField = "timestamp"
-  private val PidEntriesField = "pid_entries"
+  private val ProducerEntriesField = "producer_entries"
   private val CoordinatorEpochField = "coordinator_epoch"
   private val CurrentTxnFirstOffsetField = "current_txn_first_offset"
 
   private val VersionOffset = 0
   private val CrcOffset = VersionOffset + 2
-  private val PidEntriesOffset = CrcOffset + 4
+  private val ProducerEntriesOffset = CrcOffset + 4
 
-  val PidSnapshotEntrySchema = new Schema(
-    new Field(PidField, Type.INT64, "The producer ID"),
+  val ProducerSnapshotEntrySchema = new Schema(
+    new Field(ProducerIdField, Type.INT64, "The producer ID"),
     new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
     new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
     new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
@@ -231,33 +231,33 @@ object ProducerStateManager {
   val PidSnapshotMapSchema = new Schema(
     new Field(VersionField, Type.INT16, "Version of the snapshot file"),
     new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
-    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
+    new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
     val buffer = Files.readAllBytes(file.toPath)
     val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
 
     val version = struct.getShort(VersionField)
-    if (version != PidSnapshotVersion)
+    if (version != ProducerSnapshotVersion)
       throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
 
     val crc = struct.getUnsignedInt(CrcField)
-    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
+    val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset)
     if (crc != computedCrc)
       throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " +
         s"Stored crc: $crc. Computed crc: $computedCrc")
 
-    struct.getArray(PidEntriesField).map { pidEntryObj =>
-      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
-      val pid: Long = pidEntryStruct.getLong(PidField)
-      val epoch = pidEntryStruct.getShort(ProducerEpochField)
-      val seq = pidEntryStruct.getInt(LastSequenceField)
-      val offset = pidEntryStruct.getLong(LastOffsetField)
-      val timestamp = pidEntryStruct.getLong(TimestampField)
-      val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
-      val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField)
-      val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField)
-      val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp,
+    struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+      val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+      val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+      val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+      val seq = producerEntryStruct.getInt(LastSequenceField)
+      val offset = producerEntryStruct.getLong(LastOffsetField)
+      val timestamp = producerEntryStruct.getLong(TimestampField)
+      val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+      val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
+      val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
+      val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
         coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
       newEntry
     }
@@ -265,12 +265,12 @@ object ProducerStateManager {
 
   private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
     val struct = new Struct(PidSnapshotMapSchema)
-    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(VersionField, ProducerSnapshotVersion)
     struct.set(CrcField, 0L) // we'll fill this after writing the entries
     val entriesArray = entries.map {
-      case (pid, entry) =>
-        val pidEntryStruct = struct.instance(PidEntriesField)
-        pidEntryStruct.set(PidField, pid)
+      case (producerId, entry) =>
+        val producerEntryStruct = struct.instance(ProducerEntriesField)
+        producerEntryStruct.set(ProducerIdField, producerId)
           .set(ProducerEpochField, entry.producerEpoch)
           .set(LastSequenceField, entry.lastSeq)
           .set(LastOffsetField, entry.lastOffset)
@@ -278,16 +278,16 @@ object ProducerStateManager {
           .set(TimestampField, entry.timestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
           .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
-        pidEntryStruct
+        producerEntryStruct
     }.toArray
-    struct.set(PidEntriesField, entriesArray)
+    struct.set(ProducerEntriesField, entriesArray)
 
     val buffer = ByteBuffer.allocate(struct.sizeOf)
     struct.writeTo(buffer)
     buffer.flip()
 
     // now fill in the CRC
-    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
+    val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset)
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
 
     val fos = new FileOutputStream(file)
@@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
   // visible for testing
   private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = {
-    val pid = entry.producerId
-    producers.put(pid, entry)
+    val producerId = entry.producerId
+    producers.put(producerId, entry)
     entry.currentTxnFirstOffset.foreach { offset =>
-      ongoingTxns.put(offset, new TxnMetadata(pid, offset))
+      ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
     }
   }
 
@@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
    */
   def removeExpiredProducers(currentTimeMs: Long) {
-    producers.retain { case (pid, lastEntry) =>
+    producers.retain { case (producerId, lastEntry) =>
       !isExpired(currentTimeMs, lastEntry)
     }
   }
@@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
   /**
    * When we remove the head of the log due to retention, we need to clean up the id map. This method takes
-   * the new start offset and removes all pids which have a smaller last written offset.
+   * the new start offset and removes all producerIds which have a smaller last written offset.
    */
   def evictUnretainedProducers(logStartOffset: Long) {
     val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aaa2458..1f8bea5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, is
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
-import kafka.coordinator.transaction.{InitPidResult, TransactionCoordinator}
+import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.network.RequestChannel.{Response, Session}
@@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
         case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
         case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
@@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleInitPidRequest(request: RequestChannel.Request): Unit = {
-    val initPidRequest = request.body[InitPidRequest]
-    val transactionalId = initPidRequest.transactionalId
+  def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
+    val initProducerIdRequest = request.body[InitProducerIdRequest]
+    val transactionalId = initProducerIdRequest.transactionalId
 
     // Send response callback
-    def sendResponseCallback(result: InitPidResult): Unit = {
+    def sendResponseCallback(result: InitProducerIdResult): Unit = {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
-        trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.")
+        val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch)
+        trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
-    txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
+    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(error: Errors) {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
         val responseBody = new EndTxnResponse(throttleTimeMs, error)
-        trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+        trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
@@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    def sendResponseCallback(pid: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
-      errors.put(pid, responseStatus.mapValues(_.error).asJava)
+    def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
+      errors.put(producerId, responseStatus.mapValues(_.error).asJava)
 
       val successfulPartitions = responseStatus.filter { case (_, partitionResponse) =>
         partitionResponse.error == Errors.NONE
       }.keys.toSeq
 
       try {
-        groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions = successfulPartitions, transactionResult = result)
+        groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions, transactionResult = result)
       } catch {
         case e: Exception =>
           error(s"Received an exception while trying to update the offsets cache on transaction completion: $e")
-          val producerIdErrors = errors.get(pid)
+          val producerIdErrors = errors.get(producerId)
           successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
       }
 
-
       if (numAppends.decrementAndGet() == 0)
         sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 690d167..5ee4b12 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -590,7 +590,7 @@ object KafkaConfig {
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMsDoc = "The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer's transactional id without receiving any transaction status updates from it."
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
-    "If a client’s requested transaction time exceed this, then the broker will return an error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
+    "If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic."
   val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4e2b11a..c12f774 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -64,7 +64,7 @@ object ZkUtils {
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
-  val PidBlockPath = "/latest_pid_block"
+  val ProducerIdBlockPath = "/latest_pid_block"
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
   val SecureZkRootPaths = Seq(AdminPath,
                               BrokersPath,
@@ -75,7 +75,7 @@ object ZkUtils {
                               IsrChangeNotificationPath,
                               KafkaAclPath,
                               KafkaAclChangesPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Important: it is necessary to add any new top level Zookeeper path that contains
   //            sensitive information that should not be world readable to the Seq
@@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient,
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Visible for testing
   val zkPath = new ZkPath(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 85c631c..b032f8d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -68,22 +68,22 @@ class ProducerIdManagerTest {
     val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
     val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
 
-    val pid1 = manager1.nextPid()
-    val pid2 = manager2.nextPid()
+    val pid1 = manager1.generateProducerId()
+    val pid2 = manager2.generateProducerId()
 
     assertEquals(0, pid1)
     assertEquals(ProducerIdManager.PidBlockSize, pid2)
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid1 + i, manager1.nextPid())
+      assertEquals(pid1 + i, manager1.generateProducerId())
     }
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid2 + i, manager2.nextPid())
+      assertEquals(pid2 + i, manager2.generateProducerId())
     }
 
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId())
   }
 
   @Test(expected = classOf[KafkaException])
@@ -91,7 +91,7 @@ class ProducerIdManagerTest {
     EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
       .andAnswer(new IAnswer[(Option[String], Int)] {
         override def answer(): (Option[String], Int) = {
-          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
+          (Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0,
             Long.MaxValue - ProducerIdManager.PidBlockSize,
             Long.MaxValue))), 0)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
index df23952..83cba71 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
@@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness {
 
     val tc = servers.head.transactionCoordinator
 
-    var initPidResult: InitPidResult = null
-    def callback(result: InitPidResult): Unit = {
-      initPidResult = result
+    var initProducerIdResult: InitProducerIdResult = null
+    def callback(result: InitProducerIdResult): Unit = {
+      initProducerIdResult = result
     }
 
     val txnId = "txn"
-    tc.handleInitPid(txnId, 900000, callback)
+    tc.handleInitProducerId(txnId, 900000, callback)
 
-    while(initPidResult == null) {
+    while(initProducerIdResult == null) {
       Utils.sleep(1)
     }
 
-    Assert.assertEquals(Errors.NONE, initPidResult.error)
+    Assert.assertEquals(Errors.NONE, initProducerIdResult.error)
 
     @volatile var addPartitionErrors: Errors = null
     def addPartitionsCallback(errors: Errors): Unit = {
@@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness {
     }
 
     tc.handleAddPartitionsToTransaction(txnId,
-      initPidResult.pid,
-      initPidResult.epoch,
+      initProducerIdResult.producerId,
+      initProducerIdResult.producerEpoch,
       Set[TopicPartition](new TopicPartition(topic, 0)),
       addPartitionsCallback
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 395bfb9..2f4f572 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -58,11 +58,11 @@ class TransactionCoordinatorTest {
     txnMarkerPurgatory,
     time)
 
-  var result: InitPidResult = _
+  var result: InitProducerIdResult = _
   var error: Errors = Errors.NONE
 
   private def mockPidManager(): Unit = {
-    EasyMock.expect(pidManager.nextPid())
+    EasyMock.expect(pidManager.generateProducerId())
       .andAnswer(new IAnswer[Long] {
         override def answer(): Long = {
           nextPid += 1
@@ -90,10 +90,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -101,10 +101,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -143,16 +143,16 @@ class TransactionCoordinatorTest {
       .anyTimes()
     EasyMock.replay(pidManager, transactionManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(nextPid - 1, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(nextPid - 1, 0, Errors.NONE), result)
   }
 
   @Test
   def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinatorForId(): Unit = {
     mockPidManager()
     EasyMock.replay(pidManager)
-    coordinator.handleInitPid("some-pid", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(-1, -1, Errors.NOT_COORDINATOR), result)
+    coordinator.handleInitProducerId("some-pid", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_COORDINATOR), result)
   }
 
   @Test
@@ -165,7 +165,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
   }
 
   @Test
@@ -299,7 +299,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -312,7 +312,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -513,9 +513,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
     EasyMock.verify(transactionManager)
   }
 
@@ -568,7 +568,7 @@ class TransactionCoordinatorTest {
 
     EasyMock.expect(transactionManager.transactionsToExpire())
       .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, pid, epoch)))
-    
+
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.startup(false)
@@ -589,9 +589,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager)
 
-    coordinator.handleInitPid(transactionalId, 10, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, 10, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
   }
 
   private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = {
@@ -620,9 +620,9 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     val newTxnTimeoutMs = 10
-    coordinator.handleInitPid(transactionalId, newTxnTimeoutMs, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, newTxnTimeoutMs, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(pid, (epoch + 1).toShort, Errors.NONE), result)
+    assertEquals(InitProducerIdResult(pid, (epoch + 1).toShort, Errors.NONE), result)
     assertEquals(newTxnTimeoutMs, metadata.txnTimeoutMs)
     assertEquals(time.milliseconds(), metadata.txnLastUpdateTimestamp)
     assertEquals((epoch + 1).toShort, metadata.producerEpoch)
@@ -704,7 +704,7 @@ class TransactionCoordinatorTest {
     completedMetadata
   }
 
-  def initPidMockCallback(ret: InitPidResult): Unit = {
+  def initProducerIdMockCallback(ret: InitProducerIdResult): Unit = {
     result = ret
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9270544..425b9f1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.INIT_PRODUCER_ID =>
-          new InitPidRequest.Builder("abc")
+          new InitProducerIdRequest.Builder("abc")
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
           new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
@@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
-      case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs
+      case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
       case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
       case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
       case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs


[2/2] kafka git commit: MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse

Posted by jg...@apache.org.
MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2997 from hachikuji/minor-rename-initpid


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1c8e7d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1c8e7d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1c8e7d9

Branch: refs/heads/trunk
Commit: a1c8e7d941ad9c765dac232435a297f905eeeed5
Parents: 1cb39f7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri May 12 19:59:34 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri May 12 19:59:34 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   2 +-
 .../clients/producer/internals/PidAndEpoch.java |  36 ------
 .../producer/internals/ProducerBatch.java       |   4 +-
 .../producer/internals/ProducerIdAndEpoch.java  |  36 ++++++
 .../producer/internals/RecordAccumulator.java   |  12 +-
 .../clients/producer/internals/Sender.java      |  24 ++--
 .../producer/internals/TransactionManager.java  |  64 +++++-----
 .../errors/InvalidTxnTimeoutException.java      |   2 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../kafka/common/requests/AbstractRequest.java  |   2 +-
 .../kafka/common/requests/AbstractResponse.java |   2 +-
 .../kafka/common/requests/InitPidRequest.java   | 104 ----------------
 .../kafka/common/requests/InitPidResponse.java  |  89 --------------
 .../common/requests/InitProducerIdRequest.java  | 104 ++++++++++++++++
 .../common/requests/InitProducerIdResponse.java |  89 ++++++++++++++
 .../clients/producer/internals/SenderTest.java  |  20 ++--
 .../internals/TransactionManagerTest.java       |  26 ++--
 .../common/requests/RequestResponseTest.java    |   8 +-
 .../transaction/ProducerIdManager.scala         | 119 ++++++++++---------
 .../transaction/TransactionCoordinator.scala    |  90 +++++++-------
 .../TransactionMarkerChannelManager.scala       |   7 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  84 ++++++-------
 .../src/main/scala/kafka/server/KafkaApis.scala |  29 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   6 +-
 .../transaction/ProducerIdManagerTest.scala     |  14 +--
 .../TransactionCoordinatorIntegrationTest.scala |  16 +--
 .../TransactionCoordinatorTest.scala            |  50 ++++----
 .../unit/kafka/server/RequestQuotaTest.scala    |   4 +-
 29 files changed, 525 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b1f405a..05edf65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -688,7 +688,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (transactionManager == null)
             return;
 
-        if (transactionManager.isTransactional() && !transactionManager.hasPid())
+        if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
             throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
 
         if (transactionManager.isFenced())

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
deleted file mode 100644
index 8647a7b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/PidAndEpoch.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
-
-class PidAndEpoch {
-    static final PidAndEpoch NONE = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-
-    public final long producerId;
-    public final short epoch;
-
-    PidAndEpoch(long producerId, short epoch) {
-        this.producerId = producerId;
-        this.epoch = epoch;
-    }
-
-    public boolean isValid() {
-        return NO_PRODUCER_ID < producerId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index f5fe8e6..3c5965a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -231,8 +231,8 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
-    public void setProducerState(PidAndEpoch pidAndEpoch, int baseSequence) {
-        recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
+    public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
+        recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
new file mode 100644
index 0000000..01d5e86
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+class ProducerIdAndEpoch {
+    static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+
+    public final long producerId;
+    public final short epoch;
+
+    ProducerIdAndEpoch(long producerId, short epoch) {
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public boolean isValid() {
+        return NO_PRODUCER_ID < producerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4ffab0a..cf3736c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -444,16 +444,16 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
-                                        PidAndEpoch pidAndEpoch = null;
+                                        ProducerIdAndEpoch producerIdAndEpoch = null;
                                         if (transactionManager != null) {
-                                            pidAndEpoch = transactionManager.pidAndEpoch();
-                                            if (!pidAndEpoch.isValid())
+                                            producerIdAndEpoch = transactionManager.pidAndEpoch();
+                                            if (!producerIdAndEpoch.isValid())
                                                 // we cannot send the batch until we have refreshed the PID
                                                 break;
                                         }
 
                                         ProducerBatch batch = deque.pollFirst();
-                                        if (pidAndEpoch != null && !batch.inRetry()) {
+                                        if (producerIdAndEpoch != null && !batch.inRetry()) {
                                             // If the batch is in retry, then we should not change the pid and
                                             // sequence number, since this may introduce duplicates. In particular,
                                             // the previous attempt may actually have been accepted, and if we change
@@ -461,9 +461,9 @@ public final class RecordAccumulator {
                                             // a duplicate.
                                             int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
                                             log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
-                                                    node, pidAndEpoch.producerId, pidAndEpoch.epoch,
+                                                    node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
                                                     batch.topicPartition, sequenceNumber);
-                                            batch.setProducerState(pidAndEpoch, sequenceNumber);
+                                            batch.setProducerState(producerIdAndEpoch, sequenceNumber);
                                         }
                                         batch.close();
                                         size += batch.sizeInBytes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4d95ac0..8b96b41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -42,8 +42,8 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
@@ -357,7 +357,7 @@ public class Sender implements Runnable {
 
     private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
         String nodeId = node.idString();
-        InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
+        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
         ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
         return NetworkClientUtils.sendAndReceive(client, request, time);
     }
@@ -376,28 +376,28 @@ public class Sender implements Runnable {
         if (transactionManager == null || transactionManager.isTransactional())
             return;
 
-        while (!transactionManager.hasPid()) {
+        while (!transactionManager.hasProducerId()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitPidRequest(node);
-                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
-                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
-                        PidAndEpoch pidAndEpoch = new PidAndEpoch(
-                                initPidResponse.producerId(), initPidResponse.epoch());
-                        transactionManager.setPidAndEpoch(pidAndEpoch);
+                    if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
+                        InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
+                        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
+                                initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                     } else {
-                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
+                        log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " +
                                 "We will back off and try again.", node);
                     }
                 } else {
-                    log.debug("Could not find an available broker to send InitPidRequest to. " +
+                    log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
                             "We will back off and try again.");
                 }
             } catch (Exception e) {
                 log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
             }
-            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
+            log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ff3f114..566ad7c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -35,8 +35,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -79,7 +79,7 @@ public class TransactionManager {
 
     private volatile State currentState = State.UNINITIALIZED;
     private volatile Exception lastError = null;
-    private volatile PidAndEpoch pidAndEpoch;
+    private volatile ProducerIdAndEpoch producerIdAndEpoch;
 
     private enum State {
         UNINITIALIZED,
@@ -130,7 +130,7 @@ public class TransactionManager {
     }
 
     public TransactionManager(String transactionalId, int transactionTimeoutMs) {
-        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
         this.sequenceNumbers = new HashMap<>();
         this.transactionalId = transactionalId;
         this.transactionTimeoutMs = transactionTimeoutMs;
@@ -155,10 +155,10 @@ public class TransactionManager {
     public synchronized TransactionalRequestResult initializeTransactions() {
         ensureTransactional();
         transitionTo(State.INITIALIZING);
-        setPidAndEpoch(PidAndEpoch.NONE);
+        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
         this.sequenceNumbers.clear();
-        InitPidRequest.Builder builder = new InitPidRequest.Builder(transactionalId, transactionTimeoutMs);
-        InitPidHandler handler = new InitPidHandler(builder);
+        InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
+        InitProducerIdHandler handler = new InitProducerIdHandler(builder);
         pendingRequests.add(handler);
         return handler.result;
     }
@@ -190,8 +190,8 @@ public class TransactionManager {
         }
 
         TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
-        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, pidAndEpoch.producerId,
-                pidAndEpoch.epoch, transactionResult);
+        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
+                producerIdAndEpoch.epoch, transactionResult);
         EndTxnHandler handler = new EndTxnHandler(builder);
         pendingRequests.add(handler);
         return handler.result;
@@ -206,7 +206,7 @@ public class TransactionManager {
                     "active transaction");
 
         AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, consumerGroupId);
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
         pendingRequests.add(handler);
         return handler.result;
@@ -226,8 +226,8 @@ public class TransactionManager {
         return transactionalId;
     }
 
-    public boolean hasPid() {
-        return pidAndEpoch.isValid();
+    public boolean hasProducerId() {
+        return producerIdAndEpoch.isValid();
     }
 
     public boolean isTransactional() {
@@ -262,20 +262,20 @@ public class TransactionManager {
     }
 
     /**
-     * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
+     * Get the current pid and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
      *
-     * @return the current PidAndEpoch.
+     * @return the current ProducerIdAndEpoch.
      */
-    PidAndEpoch pidAndEpoch() {
-        return pidAndEpoch;
+    ProducerIdAndEpoch pidAndEpoch() {
+        return producerIdAndEpoch;
     }
 
     /**
      * Set the pid and epoch atomically.
      */
-    void setPidAndEpoch(PidAndEpoch pidAndEpoch) {
-        this.pidAndEpoch = pidAndEpoch;
+    void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
+        this.producerIdAndEpoch = producerIdAndEpoch;
     }
 
     /**
@@ -299,7 +299,7 @@ public class TransactionManager {
         if (isTransactional())
             throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
                     "You must either abort the ongoing transaction or reinitialize the transactional producer instead");
-        setPidAndEpoch(PidAndEpoch.NONE);
+        setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
         this.sequenceNumbers.clear();
     }
 
@@ -448,7 +448,7 @@ public class TransactionManager {
         pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
         newPartitionsToBeAddedToTransaction.clear();
         AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
         return new AddPartitionsToTxnHandler(builder);
     }
 
@@ -461,7 +461,7 @@ public class TransactionManager {
             pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
         TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
-                pidAndEpoch.producerId, pidAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 pendingTxnOffsetCommits);
         return new TxnOffsetCommitHandler(result, builder);
     }
@@ -487,7 +487,7 @@ public class TransactionManager {
         void fenced() {
             log.error("Producer has become invalid, which typically means another producer with the same " +
                             "transactional.id has been started: producerId: {}. epoch: {}.",
-                    pidAndEpoch.producerId, pidAndEpoch.epoch);
+                    producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
             result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
             transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
             result.done();
@@ -548,15 +548,15 @@ public class TransactionManager {
         abstract Priority priority();
     }
 
-    private class InitPidHandler extends TxnRequestHandler {
-        private final InitPidRequest.Builder builder;
+    private class InitProducerIdHandler extends TxnRequestHandler {
+        private final InitProducerIdRequest.Builder builder;
 
-        private InitPidHandler(InitPidRequest.Builder builder) {
+        private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
             this.builder = builder;
         }
 
         @Override
-        InitPidRequest.Builder requestBuilder() {
+        InitProducerIdRequest.Builder requestBuilder() {
             return builder;
         }
 
@@ -567,11 +567,11 @@ public class TransactionManager {
 
         @Override
         public void handleResponse(AbstractResponse response) {
-            InitPidResponse initPidResponse = (InitPidResponse) response;
-            Errors error = initPidResponse.error();
+            InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
+            Errors error = initProducerIdResponse.error();
             if (error == Errors.NONE) {
-                PidAndEpoch pidAndEpoch = new PidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
-                setPidAndEpoch(pidAndEpoch);
+                ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
+                setProducerIdAndEpoch(producerIdAndEpoch);
                 transitionTo(State.READY);
                 lastError = null;
                 result.done();
@@ -581,7 +581,7 @@ public class TransactionManager {
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
             } else {
-                fatal(new KafkaException("Unexpected error in InitPidResponse; " + error.message()));
+                fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
             }
         }
     }
@@ -616,7 +616,7 @@ public class TransactionManager {
                 reenqueue();
             } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                 reenqueue();
-            } else if (error == Errors.INVALID_PID_MAPPING) {
+            } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) {
                 fatal(new KafkaException(error.exception()));
             } else if (error == Errors.INVALID_TXN_STATE) {
                 fatal(new KafkaException(error.exception()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
index e5df248..c751bc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.errors;
 
 /**
- * The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than
+ * The transaction coordinator returns this error code if the timeout received via the InitProducerIdRequest is larger than
  * the `max.transaction.timeout.ms` config value.
  */
 public class InvalidTxnTimeoutException extends ApiException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 960fdda..58a0a2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -439,7 +439,8 @@ public enum Errors {
                 return new InvalidTxnStateException(message);
             }
         }),
-    INVALID_PID_MAPPING(49, "The PID mapping is invalid",
+    INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producerId which is not currently assigned to " +
+            "its transactionalId",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 04f2602..3aeb879 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -179,7 +179,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 request = new DeleteRecordsRequest(struct, version);
                 break;
             case INIT_PRODUCER_ID:
-                request = new InitPidRequest(struct, version);
+                request = new InitProducerIdRequest(struct, version);
                 break;
             case OFFSET_FOR_LEADER_EPOCH:
                 request = new OffsetsForLeaderEpochRequest(struct, version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index b76cb21..617934c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
             case INIT_PRODUCER_ID:
-                return new InitPidResponse(struct);
+                return new InitProducerIdResponse(struct);
             case OFFSET_FOR_LEADER_EPOCH:
                 return new OffsetsForLeaderEpochResponse(struct);
             case ADD_PARTITIONS_TO_TXN:

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
deleted file mode 100644
index 57d32e2..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class InitPidRequest extends AbstractRequest {
-    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
-
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
-    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
-
-    private final String transactionalId;
-    private final int transactionTimeoutMs;
-
-    public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
-        private final String transactionalId;
-        private final int transactionTimeoutMs;
-
-        public Builder(String transactionalId) {
-            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
-        }
-
-        public Builder(String transactionalId, int transactionTimeoutMs) {
-            super(ApiKeys.INIT_PRODUCER_ID);
-
-            if (transactionTimeoutMs <= 0)
-                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
-
-            if (transactionalId != null && transactionalId.isEmpty())
-                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
-
-            this.transactionalId = transactionalId;
-            this.transactionTimeoutMs = transactionTimeoutMs;
-        }
-
-        @Override
-        public InitPidRequest build(short version) {
-            return new InitPidRequest(version, transactionalId, transactionTimeoutMs);
-        }
-
-        @Override
-        public String toString() {
-            return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
-                    transactionTimeoutMs + ")";
-        }
-    }
-
-    public InitPidRequest(Struct struct, short version) {
-        super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
-        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
-    }
-
-    private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) {
-        super(version);
-        this.transactionalId = transactionalId;
-        this.transactionTimeoutMs = transactionTimeoutMs;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        return new InitPidResponse(throttleTimeMs, Errors.forException(e));
-    }
-
-    public static InitPidRequest parse(ByteBuffer buffer, short version) {
-        return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public int transactionTimeoutMs() {
-        return transactionTimeoutMs;
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
-        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
-        return struct;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
deleted file mode 100644
index 3c858af..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.RecordBatch;
-
-import java.nio.ByteBuffer;
-
-public class InitPidResponse extends AbstractResponse {
-    /**
-     * Possible Error codes:
-     * OK
-     *
-     */
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
-    private static final String EPOCH_KEY_NAME = "producer_epoch";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final long producerId;
-    private final short epoch;
-
-    public InitPidResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.producerId = producerId;
-        this.epoch = epoch;
-    }
-
-    public InitPidResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
-        this.epoch = struct.getShort(EPOCH_KEY_NAME);
-    }
-
-    public InitPidResponse(int throttleTimeMs, Errors errors) {
-        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
-    }
-
-    public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public Errors error() {
-        return error;
-    }
-
-    public short epoch() {
-        return epoch;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(PRODUCER_ID_KEY_NAME, producerId);
-        struct.set(EPOCH_KEY_NAME, epoch);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        return struct;
-    }
-
-    public static InitPidResponse parse(ByteBuffer buffer, short version) {
-        return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
new file mode 100644
index 0000000..45f88a2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class InitProducerIdRequest extends AbstractRequest {
+    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
+
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
+
+    private final String transactionalId;
+    private final int transactionTimeoutMs;
+
+    public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
+        private final String transactionalId;
+        private final int transactionTimeoutMs;
+
+        public Builder(String transactionalId) {
+            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
+        }
+
+        public Builder(String transactionalId, int transactionTimeoutMs) {
+            super(ApiKeys.INIT_PRODUCER_ID);
+
+            if (transactionTimeoutMs <= 0)
+                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
+
+            if (transactionalId != null && transactionalId.isEmpty())
+                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
+            this.transactionalId = transactionalId;
+            this.transactionTimeoutMs = transactionTimeoutMs;
+        }
+
+        @Override
+        public InitProducerIdRequest build(short version) {
+            return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
+                    transactionTimeoutMs + ")";
+        }
+    }
+
+    public InitProducerIdRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
+    }
+
+    private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
+        super(version);
+        this.transactionalId = transactionalId;
+        this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
+    }
+
+    public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
+        return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    public int transactionTimeoutMs() {
+        return transactionTimeoutMs;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
+        return struct;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
new file mode 100644
index 0000000..7c8a6e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.nio.ByteBuffer;
+
+public class InitProducerIdResponse extends AbstractResponse {
+    /**
+     * Possible Error codes:
+     * OK
+     *
+     */
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+    private static final String PRODUCER_ID_KEY_NAME = "producer_id";
+    private static final String EPOCH_KEY_NAME = "producer_epoch";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final int throttleTimeMs;
+    private final Errors error;
+    private final long producerId;
+    private final short epoch;
+
+    public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.error = error;
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public InitProducerIdResponse(Struct struct) {
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.epoch = struct.getShort(EPOCH_KEY_NAME);
+    }
+
+    public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
+        this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public short epoch() {
+        return epoch;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, epoch);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
+        return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c01a375..bb13dcb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -382,11 +382,11 @@ public class SenderTest {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                return body instanceof InitPidRequest;
+                return body instanceof InitProducerIdRequest;
             }
-        }, new InitPidResponse(0, Errors.NONE, producerId, (short) 0));
+        }, new InitProducerIdResponse(0, Errors.NONE, producerId, (short) 0));
         sender.run(time.milliseconds());
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         assertEquals(producerId, transactionManager.pidAndEpoch().producerId);
         assertEquals((short) 0, transactionManager.pidAndEpoch().epoch);
     }
@@ -395,7 +395,7 @@ public class SenderTest {
     public void testSequenceNumberIncrement() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -448,7 +448,7 @@ public class SenderTest {
     public void testAbortRetryWhenPidChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -480,7 +480,7 @@ public class SenderTest {
         assertEquals(0, client.inFlightRequestCount());
         assertFalse("Client ready status should be false", client.isReady(node, 0L));
 
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId + 1, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId + 1, (short) 0));
         sender.run(time.milliseconds()); // receive error
         sender.run(time.milliseconds()); // reconnect
         sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
@@ -497,7 +497,7 @@ public class SenderTest {
     public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
-        transactionManager.setPidAndEpoch(new PidAndEpoch(producerId, (short) 0));
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
         client.setNode(new Node(1, "localhost", 33343));
 
@@ -528,7 +528,7 @@ public class SenderTest {
 
         sender.run(time.milliseconds());
         assertTrue(responseFuture.isDone());
-        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasPid());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
     }
 
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8e46eb7..53686e2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -44,8 +44,8 @@ import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitPidRequest;
-import org.apache.kafka.common.requests.InitPidResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.TransactionResult;
@@ -163,7 +163,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -275,7 +275,7 @@ public class TransactionManagerTest {
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
-        assertFalse(transactionManager.hasPid());
+        assertFalse(transactionManager.hasProducerId());
 
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
         sender.run(time.milliseconds());
@@ -285,7 +285,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // get pid and epoch
 
         assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed.
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         assertEquals(pid, transactionManager.pidAndEpoch().producerId);
         assertEquals(epoch, transactionManager.pidAndEpoch().epoch);
     }
@@ -308,7 +308,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -365,7 +365,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         // User does one producer.sed
         transactionManager.maybeAddPartitionToTransaction(tp0);
@@ -428,7 +428,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -463,7 +463,7 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // get pid.
 
-        assertTrue(transactionManager.hasPid());
+        assertTrue(transactionManager.hasProducerId());
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
@@ -530,12 +530,12 @@ public class TransactionManagerTest {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
-                InitPidRequest initPidRequest = (InitPidRequest) body;
-                assertEquals(initPidRequest.transactionalId(), transactionalId);
-                assertEquals(initPidRequest.transactionTimeoutMs(), transactionTimeoutMs);
+                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
+                assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
+                assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
                 return true;
             }
-        }, new InitPidResponse(0, error, pid, epoch), shouldDisconnect);
+        }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
     private void prepareProduceResponse(Errors error, final long pid, final short epoch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b1e83bf..cbfb6a9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -879,12 +879,12 @@ public class RequestResponseTest {
         return new DeleteTopicsResponse(errors);
     }
 
-    private InitPidRequest createInitPidRequest() {
-        return new InitPidRequest.Builder(null, 100).build();
+    private InitProducerIdRequest createInitPidRequest() {
+        return new InitProducerIdRequest.Builder(null, 100).build();
     }
 
-    private InitPidResponse createInitPidResponse() {
-        return new InitPidResponse(0, Errors.NONE, 3332, (short) 3);
+    private InitProducerIdResponse createInitPidResponse() {
+        return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index bb7f57b..916ffa9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -20,49 +20,49 @@ import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
 
 /**
- * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way
- * such that the same PID will not be assigned twice across multiple transaction coordinators.
+ * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
+ * such that the same producerId will not be assigned twice across multiple transaction coordinators.
  *
- * PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
- * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+ * ProducerIds are managed via ZooKeeper, where the latest producerId block is written on the corresponding ZK
+ * path by the manager who claims the block, where the written block_start and block_end are both inclusive.
  */
 object ProducerIdManager extends Logging {
   val CurrentVersion: Long = 1L
   val PidBlockSize: Long = 1000L
 
-  def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+  def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): String = {
     Json.encode(Map("version" -> CurrentVersion,
-      "broker" -> pidBlock.brokerId,
-      "block_start" -> pidBlock.blockStartPid.toString,
-      "block_end" -> pidBlock.blockEndPid.toString)
+      "broker" -> producerIdBlock.brokerId,
+      "block_start" -> producerIdBlock.blockStartId.toString,
+      "block_end" -> producerIdBlock.blockEndId.toString)
     )
   }
 
-  def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+  def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = {
     try {
       Json.parseFull(jsonData).flatMap { m =>
-        val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
-        val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
-        val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
-        val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
-        Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
-      }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+        val producerIdBlockInfo = m.asInstanceOf[Map[String, Any]]
+        val brokerId = producerIdBlockInfo("broker").asInstanceOf[Int]
+        val blockStart = producerIdBlockInfo("block_start").asInstanceOf[String].toLong
+        val blockEnd = producerIdBlockInfo("block_end").asInstanceOf[String].toLong
+        Some(ProducerIdBlock(brokerId, blockStart, blockEnd))
+      }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData"))
     } catch {
       case e: java.lang.NumberFormatException =>
         // this should never happen: the written data has exceeded long type limit
-        fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+        fatal(s"Read jason data $jsonData contains producerIds that have exceeded long type limit")
         throw e
     }
   }
 }
 
-case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) {
   override def toString: String = {
-    val pidBlockInfo = new StringBuilder
-    pidBlockInfo.append("(brokerId:" + brokerId)
-    pidBlockInfo.append(",blockStartPID:" + blockStartPid)
-    pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
-    pidBlockInfo.toString()
+    val producerIdBlockInfo = new StringBuilder
+    producerIdBlockInfo.append("(brokerId:" + brokerId)
+    producerIdBlockInfo.append(",blockStartProducerId:" + blockStartId)
+    producerIdBlockInfo.append(",blockEndProducerId:" + blockEndId + ")")
+    producerIdBlockInfo.toString()
   }
 }
 
@@ -70,84 +70,85 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
 
   this.logIdent = "[ProducerId Manager " + brokerId + "]: "
 
-  private var currentPIDBlock: ProducerIdBlock = null
-  private var nextPID: Long = -1L
+  private var currentProducerIdBlock: ProducerIdBlock = null
+  private var nextProducerId: Long = -1L
 
-  // grab the first block of PIDs
+  // grab the first block of producerIds
   this synchronized {
-    getNewPidBlock()
-    nextPID = currentPIDBlock.blockStartPid
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.blockStartId
   }
 
-  private def getNewPidBlock(): Unit = {
+  private def getNewProducerIdBlock(): Unit = {
     var zkWriteComplete = false
     while (!zkWriteComplete) {
-      // refresh current pid block from zookeeper again
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      // refresh current producerId block from zookeeper again
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
 
-      // generate the new pid block
-      currentPIDBlock = dataOpt match {
+      // generate the new producerId block
+      currentProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
-          debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
+          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
-            // we have exhausted all pids (wow!), treat it as a fatal error
-            fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
-            throw new KafkaException("Have exhausted all pids.")
+          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+            // we have exhausted all producerIds (wow!), treat it as a fatal error
+            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
         case None =>
-          debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
           ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
       }
 
-      val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
 
-      // try to write the new pid block into zookeeper
-      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+      // try to write the new producerId block into zookeeper
+      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath,
+        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
       zkWriteComplete = succeeded
 
       if (zkWriteComplete)
-        info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
     }
   }
 
-  private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+  private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
     try {
-      val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
       dataOpt match {
         case Some(data) =>
-          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
-          (currPIDBlock.equals(expectedPidBlock), zkVersion)
+          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
+          (currProducerIdBLock == expectedPidBlock, zkVersion)
         case None =>
           (false, -1)
       }
     } catch {
       case e: Exception =>
-        warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
-        
+        warn(s"Error while checking for producerId block Zk data on path $path: expected data $expectedData", e)
+
         (false, -1)
     }
   }
 
-  def nextPid(): Long = {
+  def generateProducerId(): Long = {
     this synchronized {
-      // grab a new block of PIDs if this block has been exhausted
-      if (nextPID > currentPIDBlock.blockEndPid) {
-        getNewPidBlock()
-        nextPID = currentPIDBlock.blockStartPid + 1
+      // grab a new block of producerIds if this block has been exhausted
+      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+        getNewProducerIdBlock()
+        nextProducerId = currentProducerIdBlock.blockStartId + 1
       } else {
-        nextPID += 1
+        nextProducerId += 1
       }
 
-      nextPID - 1
+      nextProducerId - 1
     }
   }
 
   def shutdown() {
-    info(s"Shutdown complete: last PID assigned $nextPID")
+    info(s"Shutdown complete: last producerId assigned $nextProducerId")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 7632f3f..233f7d7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -47,20 +47,20 @@ object TransactionCoordinator {
       config.transactionTopicMinISR,
       config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
-    val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+    val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
     val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false)
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, scheduler, pidManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
+    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
   }
 
-  private def initTransactionError(error: Errors): InitPidResult = {
-    InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
+  private def initTransactionError(error: Errors): InitProducerIdResult = {
+    InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
   }
 
-  private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitPidResult = {
-    InitPidResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
+  private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitProducerIdResult = {
+    InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
   }
 }
 
@@ -74,7 +74,7 @@ object TransactionCoordinator {
  */
 class TransactionCoordinator(brokerId: Int,
                              scheduler: Scheduler,
-                             pidManager: ProducerIdManager,
+                             producerIdManager: ProducerIdManager,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
                              txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
@@ -83,22 +83,21 @@ class TransactionCoordinator(brokerId: Int,
 
   import TransactionCoordinator._
 
-  type InitPidCallback = InitPidResult => Unit
+  type InitProducerIdCallback = InitProducerIdResult => Unit
   type AddPartitionsCallback = Errors => Unit
   type EndTxnCallback = Errors => Unit
 
   /* Active flag of the coordinator */
   private val isActive = new AtomicBoolean(false)
 
-  def handleInitPid(transactionalId: String,
-                    transactionTimeoutMs: Int,
-                    responseCallback: InitPidCallback): Unit = {
-
+  def handleInitProducerId(transactionalId: String,
+                           transactionTimeoutMs: Int,
+                           responseCallback: InitProducerIdCallback): Unit = {
     if (transactionalId == null || transactionalId.isEmpty) {
       // if the transactional id is not specified, then always blindly accept the request
-      // and return a new pid from the pid manager
-      val pid = pidManager.nextPid()
-      responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+      // and return a new producerId from the producerId manager
+      val producerId = producerIdManager.generateProducerId()
+      responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
     } else if (!txnManager.isCoordinatorFor(transactionalId)) {
       // check if it is the assigned coordinator for the transactional id
       responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
@@ -108,12 +107,12 @@ class TransactionCoordinator(brokerId: Int,
       // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
       responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
     } else {
-      // only try to get a new pid and update the cache if the transactional id is unknown
-      val result: Either[InitPidResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
+      // only try to get a new producerId and update the cache if the transactional id is unknown
+      val result: Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          val pid = pidManager.nextPid()
+          val producerId = producerIdManager.generateProducerId()
           val now = time.milliseconds()
-          val createdMetadata = new TransactionMetadata(producerId = pid,
+          val createdMetadata = new TransactionMetadata(producerId = producerId,
             producerEpoch = 0,
             txnTimeoutMs = transactionTimeoutMs,
             state = Empty,
@@ -129,7 +128,7 @@ class TransactionCoordinator(brokerId: Int,
           // in this case we will treat it as the metadata has existed already
           txnMetadata synchronized {
             if (!txnMetadata.eq(createdMetadata)) {
-              initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+              initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
             } else {
               Right(coordinatorEpoch, txnMetadata.prepareNewPid(time.milliseconds()))
             }
@@ -140,13 +139,13 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = existingEpochAndMetadata.transactionMetadata
 
           txnMetadata synchronized {
-            initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+            initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
           }
       }
 
       result match {
-        case Left(pidResult) =>
-          responseCallback(pidResult)
+        case Left(producerIdResult) =>
+          responseCallback(producerIdResult)
 
         case Right((coordinatorEpoch, newMetadata)) =>
           if (newMetadata.txnState == Ongoing) {
@@ -178,11 +177,10 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
-  private def initPidWithExistingMetadata(transactionalId: String,
-                                          transactionTimeoutMs: Int,
-                                          coordinatorEpoch: Int,
-                                          txnMetadata: TransactionMetadata): Either[InitPidResult, (Int, TransactionMetadataTransition)] = {
-
+  private def initProducerIdWithExistingMetadata(transactionalId: String,
+                                                 transactionTimeoutMs: Int,
+                                                 coordinatorEpoch: Int,
+                                                 txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TransactionMetadataTransition)] = {
     if (txnMetadata.pendingTransitionInProgress) {
       // return a retriable exception to let the client backoff and retry
       Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
@@ -216,8 +214,8 @@ class TransactionCoordinator(brokerId: Int,
 
 
   def handleAddPartitionsToTransaction(transactionalId: String,
-                                       pid: Long,
-                                       epoch: Short,
+                                       producerId: Long,
+                                       producerEpoch: Short,
                                        partitions: collection.Set[TopicPartition],
                                        responseCallback: AddPartitionsCallback): Unit = {
     val error = validateTransactionalId(transactionalId)
@@ -225,10 +223,10 @@ class TransactionCoordinator(brokerId: Int,
       responseCallback(error)
     } else {
       // try to update the transaction metadata and append the updated metadata to txn log;
-      // if there is no such metadata treat it as invalid pid mapping error.
+      // if there is no such metadata treat it as invalid producerId mapping error.
       val result: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          Left(Errors.INVALID_PID_MAPPING)
+          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
         case Some(epochAndMetadata) =>
           val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
@@ -236,9 +234,9 @@ class TransactionCoordinator(brokerId: Int,
 
           // generate the new transaction metadata with added partitions
           txnMetadata synchronized {
-            if (txnMetadata.producerId != pid) {
-              Left(Errors.INVALID_PID_MAPPING)
-            } else if (txnMetadata.producerEpoch != epoch) {
+            if (txnMetadata.producerId != producerId) {
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+            } else if (txnMetadata.producerEpoch != producerEpoch) {
               Left(Errors.INVALID_PRODUCER_EPOCH)
             } else if (txnMetadata.pendingTransitionInProgress) {
               // return a retriable exception to let the client backoff and retry
@@ -274,8 +272,8 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleEndTransaction(transactionalId: String,
-                           pid: Long,
-                           epoch: Short,
+                           producerId: Long,
+                           producerEpoch: Short,
                            txnMarkerResult: TransactionResult,
                            responseCallback: EndTxnCallback): Unit = {
     val error = validateTransactionalId(transactionalId)
@@ -284,16 +282,16 @@ class TransactionCoordinator(brokerId: Int,
     else {
       val preAppendResult: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          Left(Errors.INVALID_PID_MAPPING)
+          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
 
         case Some(epochAndTxnMetadata) =>
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata synchronized {
-            if (txnMetadata.producerId != pid)
-              Left(Errors.INVALID_PID_MAPPING)
-            else if (txnMetadata.producerEpoch != epoch)
+            if (txnMetadata.producerId != producerId)
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+            else if (txnMetadata.producerEpoch != producerEpoch)
               Left(Errors.INVALID_PRODUCER_EPOCH)
             else if (txnMetadata.pendingTransitionInProgress)
               Left(Errors.CONCURRENT_TRANSACTIONS)
@@ -343,9 +341,9 @@ class TransactionCoordinator(brokerId: Int,
 
                     val txnMetadata = epochAndMetadata.transactionMetadata
                     txnMetadata synchronized {
-                      if (txnMetadata.producerId != pid)
-                        Left(Errors.INVALID_PID_MAPPING)
-                      else if (txnMetadata.producerEpoch != epoch)
+                      if (txnMetadata.producerId != producerId)
+                        Left(Errors.INVALID_PRODUCER_ID_MAPPING)
+                      else if (txnMetadata.producerEpoch != producerEpoch)
                         Left(Errors.INVALID_PRODUCER_EPOCH)
                       else if (txnMetadata.pendingTransitionInProgress)
                         Left(Errors.CONCURRENT_TRANSACTIONS)
@@ -452,11 +450,11 @@ class TransactionCoordinator(brokerId: Int,
     isActive.set(false)
     scheduler.shutdown()
     txnMarkerPurgatory.shutdown()
-    pidManager.shutdown()
+    producerIdManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()
     info("Shutdown complete.")
   }
 }
 
-case class InitPidResult(pid: Long, epoch: Short, error: Errors)
+case class InitProducerIdResult(producerId: Long, producerEpoch: Short, error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index b7a2e80..90c9c42 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -251,7 +251,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
   }
 
-  def addTxnMarkersToBrokerQueue(transactionalId: String, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
+  def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short,
+                                 result: TransactionResult, coordinatorEpoch: Int,
+                                 topicPartitions: immutable.Set[TopicPartition]): Unit = {
     val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
     val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
       var brokerNode: Option[Node] = None
@@ -269,7 +271,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
 
     for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
-      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava))
+      val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
+      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
       addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
     }