You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/10 22:03:50 UTC

kafka git commit: MINOR: Avoid some unnecessary collection copies in KafkaApis

Repository: kafka
Updated Branches:
  refs/heads/trunk 90b5ce3f0 -> 1027ff3c7


MINOR: Avoid some unnecessary collection copies in KafkaApis

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4035 from hachikuji/KAFKA-5547-followup and squashes the following commits:

f6b04ce1a [Jason Gustafson] Add a couple missed common fields
d3473b14d [Jason Gustafson] Fix compilation errors and a few warnings
58a0ae695 [Jason Gustafson] MINOR: Avoid some unnecessary collection copies in KafkaApis


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1027ff3c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1027ff3c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1027ff3c

Branch: refs/heads/trunk
Commit: 1027ff3c769906b7b80582217cf5b4703fd6864d
Parents: 90b5ce3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 10 15:01:10 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 10 15:01:10 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/protocol/CommonFields.java     |   2 +
 .../common/requests/FindCoordinatorRequest.java |  13 +-
 .../common/requests/InitProducerIdRequest.java  |   9 +-
 .../kafka/common/requests/ProduceRequest.java   |  13 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 180 ++++++++-----------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +-
 .../main/scala/kafka/utils/json/JsonValue.scala |   2 -
 8 files changed, 96 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 472a791..e1d1884 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -35,6 +35,8 @@ public class CommonFields {
 
     // Transactional APIs
     public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
+    public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id",
+            "The transactional id or null if the producer is not transactional");
     public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
     public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index c94bcde..9bfc968 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -26,16 +26,15 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class FindCoordinatorRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
     private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
 
-    private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
-            new Field("group_id", STRING, "The unique group id."));
+    private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
 
     private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
             new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
@@ -102,8 +101,8 @@ public class FindCoordinatorRequest extends AbstractRequest {
             this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
         else
             this.coordinatorType = CoordinatorType.GROUP;
-        if (struct.hasField(GROUP_ID_KEY_NAME))
-            this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME);
+        if (struct.hasField(GROUP_ID))
+            this.coordinatorKey = struct.get(GROUP_ID);
         else
             this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
     }
@@ -138,8 +137,8 @@ public class FindCoordinatorRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
-        if (struct.hasField(GROUP_ID_KEY_NAME))
-            struct.set(GROUP_ID_KEY_NAME, coordinatorKey);
+        if (struct.hasField(GROUP_ID))
+            struct.set(GROUP_ID, coordinatorKey);
         else
             struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
         if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index fa14a97..6c659ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -24,17 +24,16 @@ import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 public class InitProducerIdRequest extends AbstractRequest {
     public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
 
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
 
     private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."),
+            NULLABLE_TRANSACTIONAL_ID,
             new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
 
     public static Schema[] schemaVersions() {
@@ -79,7 +78,7 @@ public class InitProducerIdRequest extends AbstractRequest {
 
     public InitProducerIdRequest(Struct struct, short version) {
         super(version);
-        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
         this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
     }
 
@@ -109,7 +108,7 @@ public class InitProducerIdRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
         struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index ee4a2e2..91e3aeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.CommonFields;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -39,15 +40,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 
 public class ProduceRequest extends AbstractRequest {
-    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String ACKS_KEY_NAME = "acks";
     private static final String TIMEOUT_KEY_NAME = "timeout";
     private static final String TOPIC_DATA_KEY_NAME = "topic_data";
@@ -87,8 +87,7 @@ public class ProduceRequest extends AbstractRequest {
     // Produce request V3 adds the transactional id which is used for authorization when attempting to write
     // transactional data. This version also adds support for message format V2.
     private static final Schema PRODUCE_REQUEST_V3 = new Schema(
-            new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional ID of the producer. This is used to " +
-                    "authorize transaction produce requests. This can be null for non-transactional producers."),
+            CommonFields.NULLABLE_TRANSACTIONAL_ID,
             new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
                     "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " +
                     "for only the leader and -1 for the full ISR."),
@@ -229,7 +228,7 @@ public class ProduceRequest extends AbstractRequest {
         partitionSizes = createPartitionSizes(partitionRecords);
         acks = struct.getShort(ACKS_KEY_NAME);
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
-        transactionalId = struct.hasField(TRANSACTIONAL_ID_KEY_NAME) ? struct.getString(TRANSACTIONAL_ID_KEY_NAME) : null;
+        transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
     }
 
     private void validateRecords(short version, MemoryRecords records) {
@@ -268,9 +267,7 @@ public class ProduceRequest extends AbstractRequest {
         Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
-
-        if (struct.hasField(TRANSACTIONAL_ID_KEY_NAME))
-            struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);
 
         List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
         for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index af81697..33884d6 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
 import LogConfig._
 import joptsimple.OptionParser
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8eceaa0..0066702 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -265,25 +265,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
     } else {
 
-      var unauthorizedTopics = Set[TopicPartition]()
-      var nonExistingTopics = Set[TopicPartition]()
-      var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]()
+      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
 
-      for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) {
+      for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
         if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
-          unauthorizedTopics += topicPartition
+          unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition.topic))
-          nonExistingTopics += topicPartition
+          nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
-          authorizedTopics += (topicPartition -> partitionData)
+          authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
       }
 
+      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
+
       // the callback for sending an offset commit response
       def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
-        val combinedCommitStatus = commitStatus ++
-          unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-
+        val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
         if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, error) =>
             if (error != Errors.NONE) {
@@ -295,11 +294,11 @@ class KafkaApis(val requestChannel: RequestChannel,
           new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
       }
 
-      if (authorizedTopics.isEmpty)
+      if (authorizedTopicRequestInfo.isEmpty)
         sendResponseCallback(Map.empty)
       else if (header.apiVersion == 0) {
         // for version 0 always store offsets to ZK
-        val responseInfo = authorizedTopics.map {
+        val responseInfo = authorizedTopicRequestInfo.map {
           case (topicPartition, partitionData) =>
             val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
             try {
@@ -313,7 +312,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case e: Throwable => (topicPartition, Errors.forException(e))
             }
         }
-        sendResponseCallback(responseInfo.toMap)
+        sendResponseCallback(responseInfo)
       } else {
         // for version 1 and beyond store offsets in offset manager
 
@@ -334,7 +333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - If v2 we use the default expiration timestamp
         val currentTimestamp = time.milliseconds
         val defaultExpireTimestamp = offsetRetention + currentTimestamp
-        val partitionData = authorizedTopics.mapValues { partitionData =>
+        val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
           val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
           new OffsetAndMetadata(
             offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
@@ -353,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           offsetCommitRequest.groupId,
           offsetCommitRequest.memberId,
           offsetCommitRequest.generationId,
-          partitionData.toMap,
+          partitionData,
           sendResponseCallback)
       }
     }
@@ -381,15 +380,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    var unauthorizedTopics = Set[TopicPartition]()
-    var nonExistingTopics = Set[TopicPartition]()
-    var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+    val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
+    val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
+    val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
     for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
       if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
-        unauthorizedTopics += topicPartition
+        unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition.topic))
-        nonExistingTopics += topicPartition
+        nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
         authorizedRequestInfo += (topicPartition -> memoryRecords)
     }
@@ -397,10 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++
-        unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-        nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
+      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
       var errorInResponse = false
 
       mergedResponseStatus.foreach { case (topicPartition, status) =>
@@ -483,29 +479,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
 
-    var unauthorizedTopics = Set[TopicPartition]()
-    var nonExistingTopics = Set[TopicPartition]()
-    var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
+    val unauthorizedTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+    val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+    val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
 
     for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
       if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
-        unauthorizedTopics += topicPartition
+        unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+          FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+          FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
       else if (!metadataCache.contains(topicPartition.topic))
-        nonExistingTopics += topicPartition
+        nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+          FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+          FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
       else
         authorizedRequestInfo += (topicPartition -> partitionData)
     }
 
-    val nonExistingPartitionData = nonExistingTopics.map {
-      case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
-    }
-
-    val unauthorizedForReadPartitionData = unauthorizedTopics.map {
-      case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
-    }
-
     def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
 
       // Down-conversion of the fetched records is needed when the stored magic version is
@@ -547,8 +537,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData
-
+      val mergedPartitionData = partitionData ++ unauthorizedTopicResponseData ++ nonExistingTopicResponseData
       val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
 
       mergedPartitionData.foreach { case (topicPartition, data) =>
@@ -1393,23 +1382,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
 
-    var unauthorizedTopics = Set[String]()
-    var nonExistingTopics = Set[String]()
-    var authorizedForDeleteTopics =  Set[String]()
+    val unauthorizedTopicErrors = mutable.Map[String, Errors]()
+    val nonExistingTopicErrors = mutable.Map[String, Errors]()
+    val authorizedForDeleteTopics =  mutable.Set[String]()
 
     for (topic <- deleteTopicRequest.topics.asScala) {
       if (!authorize(request.session, Delete, new Resource(Topic, topic)))
-        unauthorizedTopics += topic
+        unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
       else if (!metadataCache.contains(topic))
-        nonExistingTopics += topic
+        nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
       else
-        authorizedForDeleteTopics += topic
+        authorizedForDeleteTopics.add(topic)
     }
 
-    def sendResponseCallback(results: Map[String, Errors]): Unit = {
+    def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++
-            nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results
+        val completeResults = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ authorizedTopicErrors
         val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
         trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
@@ -1439,28 +1427,24 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDeleteRecordsRequest(request: RequestChannel.Request) {
     val deleteRecordsRequest = request.body[DeleteRecordsRequest]
 
-    var unauthorizedTopics = Set[TopicPartition]()
-    var nonExistingTopics = Set[TopicPartition]()
-    var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]()
+    val unauthorizedTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]()
+    val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]()
+    val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
 
     for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
       if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
-        unauthorizedTopics += topicPartition
+        unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
+          DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
       else if (!metadataCache.contains(topicPartition.topic))
-        nonExistingTopics += topicPartition
+        nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
+          DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
-        authorizedForDeleteTopics += (topicPartition -> offset)
+        authorizedForDeleteTopicOffsets += (topicPartition -> offset)
     }
 
     // the callback for sending a DeleteRecordsResponse
-    def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
-
-      val mergedResponseStatus = responseStatus ++
-        unauthorizedTopics.map(_ ->
-          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-        nonExistingTopics.map(_ ->
-          new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
+    def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+      val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
       mergedResponseStatus.foreach { case (topicPartition, status) =>
         if (status.error != Errors.NONE) {
           debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
@@ -1475,13 +1459,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava))
     }
 
-    if (authorizedForDeleteTopics.isEmpty)
+    if (authorizedForDeleteTopicOffsets.isEmpty)
       sendResponseCallback(Map.empty)
     else {
       // call the replica manager to append messages to the replicas
       replicaManager.deleteRecords(
         deleteRecordsRequest.timeout.toLong,
-        authorizedForDeleteTopics.mapValues(_.toLong),
+        authorizedForDeleteTopicOffsets,
         sendResponseCallback)
     }
   }
@@ -1650,45 +1634,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions
+    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
     if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
     else {
-      val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
-
-      var unauthorizedTopics = Set[TopicPartition]()
-      var nonExistingTopics = Set[TopicPartition]()
-      var authorizedPartitions =  Set[TopicPartition]()
-
-      for ( topicPartition <- partitionsToAdd.asScala) {
-        if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
-          unauthorizedTopics += topicPartition
+      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val authorizedPartitions = mutable.Set[TopicPartition]()
+
+      for (topicPartition <- partitionsToAdd) {
+        if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
+            !authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition.topic))
-          nonExistingTopics += topicPartition
+          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
         else
-          authorizedPartitions += topicPartition
+          authorizedPartitions.add(topicPartition)
       }
 
-      if (unauthorizedTopics.nonEmpty
-        || nonExistingTopics.nonEmpty
-        || internalTopics.nonEmpty) {
-
+      if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) {
         // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
         // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
         // the authorization check to indicate that they were not added to the transaction.
-        val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
-          internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap
-
+        val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++
+          authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
       } else {
         def sendResponseCallback(error: Errors): Unit = {
           def createResponse(requestThrottleMs: Int): AbstractResponse = {
             val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
-              partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava)
+              partitionsToAdd.map{tp => (tp, error)}.toMap.asJava)
             trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
             responseBody
           }
@@ -1699,7 +1676,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
           addPartitionsToTxnRequest.producerId,
           addPartitionsToTxnRequest.producerEpoch,
-          partitionsToAdd.asScala.toSet,
+          authorizedPartitions,
           sendResponseCallback)
       }
     }
@@ -1749,25 +1726,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
-      var unauthorizedTopics = Set[TopicPartition]()
-      var nonExistingTopics = Set[TopicPartition]()
-      var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
+      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+      val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
 
       for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
         if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
-          unauthorizedTopics += topicPartition
+          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
         else if (!metadataCache.contains(topicPartition.topic))
-          nonExistingTopics += topicPartition
+          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
         else
-          authorizedTopics += (topicPartition -> commitedOffset)
+          authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)
       }
 
       // the callback for sending an offset commit response
-      def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
-        val combinedCommitStatus = commitStatus ++
-          unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-
+      def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]) {
+        val combinedCommitStatus = authorizedTopicErrors ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
         if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, error) =>
             if (error != Errors.NONE) {
@@ -1779,10 +1753,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
       }
 
-      if (authorizedTopics.isEmpty)
+      if (authorizedTopicCommittedOffsets.isEmpty)
         sendResponseCallback(Map.empty)
       else {
-        val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap)
+        val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap)
         groupCoordinator.handleTxnCommitOffsets(
           txnOffsetCommitRequest.consumerGroupId,
           txnOffsetCommitRequest.producerId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0b5d43e..1df0916 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -39,7 +39,6 @@ import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
-import kafka.utils.Json._
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -494,7 +493,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   /**
    * Create an ephemeral node with the given path and data. Create parents if necessary.
    */
-  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
+  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
       zkPath.createEphemeral(path, data, acl)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/utils/json/JsonValue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/JsonValue.scala b/core/src/main/scala/kafka/utils/json/JsonValue.scala
index 2be1880..cbc82c0 100644
--- a/core/src/main/scala/kafka/utils/json/JsonValue.scala
+++ b/core/src/main/scala/kafka/utils/json/JsonValue.scala
@@ -17,8 +17,6 @@
 
 package kafka.utils.json
 
-import scala.collection._
-
 import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
 import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}