You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/10 22:03:50 UTC
kafka git commit: MINOR: Avoid some unnecessary collection copies in
KafkaApis
Repository: kafka
Updated Branches:
refs/heads/trunk 90b5ce3f0 -> 1027ff3c7
MINOR: Avoid some unnecessary collection copies in KafkaApis
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #4035 from hachikuji/KAFKA-5547-followup and squashes the following commits:
f6b04ce1a [Jason Gustafson] Add a couple missed common fields
d3473b14d [Jason Gustafson] Fix compilation errors and a few warnings
58a0ae695 [Jason Gustafson] MINOR: Avoid some unnecessary collection copies in KafkaApis
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1027ff3c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1027ff3c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1027ff3c
Branch: refs/heads/trunk
Commit: 1027ff3c769906b7b80582217cf5b4703fd6864d
Parents: 90b5ce3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Oct 10 15:01:10 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 10 15:01:10 2017 -0700
----------------------------------------------------------------------
.../kafka/common/protocol/CommonFields.java | 2 +
.../common/requests/FindCoordinatorRequest.java | 13 +-
.../common/requests/InitProducerIdRequest.java | 9 +-
.../kafka/common/requests/ProduceRequest.java | 13 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 180 ++++++++-----------
core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +-
.../main/scala/kafka/utils/json/JsonValue.scala | 2 -
8 files changed, 96 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 472a791..e1d1884 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -35,6 +35,8 @@ public class CommonFields {
// Transactional APIs
public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
+ public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id",
+ "The transactional id or null if the producer is not transactional");
public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index c94bcde..9bfc968 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -26,16 +26,15 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class FindCoordinatorRequest extends AbstractRequest {
- private static final String GROUP_ID_KEY_NAME = "group_id";
private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
- private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
- new Field("group_id", STRING, "The unique group id."));
+ private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
@@ -102,8 +101,8 @@ public class FindCoordinatorRequest extends AbstractRequest {
this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
else
this.coordinatorType = CoordinatorType.GROUP;
- if (struct.hasField(GROUP_ID_KEY_NAME))
- this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME);
+ if (struct.hasField(GROUP_ID))
+ this.coordinatorKey = struct.get(GROUP_ID);
else
this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
}
@@ -138,8 +137,8 @@ public class FindCoordinatorRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
- if (struct.hasField(GROUP_ID_KEY_NAME))
- struct.set(GROUP_ID_KEY_NAME, coordinatorKey);
+ if (struct.hasField(GROUP_ID))
+ struct.set(GROUP_ID, coordinatorKey);
else
struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
index fa14a97..6c659ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java
@@ -24,17 +24,16 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
public class InitProducerIdRequest extends AbstractRequest {
public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
- private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
- new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."),
+ NULLABLE_TRANSACTIONAL_ID,
new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
public static Schema[] schemaVersions() {
@@ -79,7 +78,7 @@ public class InitProducerIdRequest extends AbstractRequest {
public InitProducerIdRequest(Struct struct, short version) {
super(version);
- this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+ this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
}
@@ -109,7 +108,7 @@ public class InitProducerIdRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
- struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
return struct;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index ee4a2e2..91e3aeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
@@ -39,15 +40,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
public class ProduceRequest extends AbstractRequest {
- private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String ACKS_KEY_NAME = "acks";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String TOPIC_DATA_KEY_NAME = "topic_data";
@@ -87,8 +87,7 @@ public class ProduceRequest extends AbstractRequest {
// Produce request V3 adds the transactional id which is used for authorization when attempting to write
// transactional data. This version also adds support for message format V2.
private static final Schema PRODUCE_REQUEST_V3 = new Schema(
- new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional ID of the producer. This is used to " +
- "authorize transaction produce requests. This can be null for non-transactional producers."),
+ CommonFields.NULLABLE_TRANSACTIONAL_ID,
new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
"received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " +
"for only the leader and -1 for the full ISR."),
@@ -229,7 +228,7 @@ public class ProduceRequest extends AbstractRequest {
partitionSizes = createPartitionSizes(partitionRecords);
acks = struct.getShort(ACKS_KEY_NAME);
timeout = struct.getInt(TIMEOUT_KEY_NAME);
- transactionalId = struct.hasField(TRANSACTIONAL_ID_KEY_NAME) ? struct.getString(TRANSACTIONAL_ID_KEY_NAME) : null;
+ transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
}
private void validateRecords(short version, MemoryRecords records) {
@@ -268,9 +267,7 @@ public class ProduceRequest extends AbstractRequest {
Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);
-
- if (struct.hasField(TRANSACTIONAL_ID_KEY_NAME))
- struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+ struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);
List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index af81697..33884d6 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
import LogConfig._
import joptsimple.OptionParser
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8eceaa0..0066702 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -265,25 +265,24 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
} else {
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]()
+ val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData]
- for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) {
+ for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- authorizedTopics += (topicPartition -> partitionData)
+ authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
}
+ val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
+
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
- val combinedCommitStatus = commitStatus ++
- unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-
+ val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
if (error != Errors.NONE) {
@@ -295,11 +294,11 @@ class KafkaApis(val requestChannel: RequestChannel,
new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
}
- if (authorizedTopics.isEmpty)
+ if (authorizedTopicRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
- val responseInfo = authorizedTopics.map {
+ val responseInfo = authorizedTopicRequestInfo.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
@@ -313,7 +312,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: Throwable => (topicPartition, Errors.forException(e))
}
}
- sendResponseCallback(responseInfo.toMap)
+ sendResponseCallback(responseInfo)
} else {
// for version 1 and beyond store offsets in offset manager
@@ -334,7 +333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// - If v2 we use the default expiration timestamp
val currentTimestamp = time.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
- val partitionData = authorizedTopics.mapValues { partitionData =>
+ val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
@@ -353,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetCommitRequest.groupId,
offsetCommitRequest.memberId,
offsetCommitRequest.generationId,
- partitionData.toMap,
+ partitionData,
sendResponseCallback)
}
}
@@ -381,15 +380,15 @@ class KafkaApis(val requestChannel: RequestChannel,
return
}
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+ val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
+ val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
+ val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
authorizedRequestInfo += (topicPartition -> memoryRecords)
}
@@ -397,10 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- val mergedResponseStatus = responseStatus ++
- unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
- nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
+ val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
@@ -483,29 +479,23 @@ class KafkaApis(val requestChannel: RequestChannel,
val versionId = request.header.apiVersion
val clientId = request.header.clientId
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
+ val unauthorizedTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+ val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+ val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
else
authorizedRequestInfo += (topicPartition -> partitionData)
}
- val nonExistingPartitionData = nonExistingTopics.map {
- case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
- }
-
- val unauthorizedForReadPartitionData = unauthorizedTopics.map {
- case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
- }
-
def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
// Down-conversion of the fetched records is needed when the stored magic version is
@@ -547,8 +537,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData
-
+ val mergedPartitionData = partitionData ++ unauthorizedTopicResponseData ++ nonExistingTopicResponseData
val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
mergedPartitionData.foreach { case (topicPartition, data) =>
@@ -1393,23 +1382,22 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
val deleteTopicRequest = request.body[DeleteTopicsRequest]
- var unauthorizedTopics = Set[String]()
- var nonExistingTopics = Set[String]()
- var authorizedForDeleteTopics = Set[String]()
+ val unauthorizedTopicErrors = mutable.Map[String, Errors]()
+ val nonExistingTopicErrors = mutable.Map[String, Errors]()
+ val authorizedForDeleteTopics = mutable.Set[String]()
for (topic <- deleteTopicRequest.topics.asScala) {
if (!authorize(request.session, Delete, new Resource(Topic, topic)))
- unauthorizedTopics += topic
+ unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topic))
- nonExistingTopics += topic
+ nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
else
- authorizedForDeleteTopics += topic
+ authorizedForDeleteTopics.add(topic)
}
- def sendResponseCallback(results: Map[String, Errors]): Unit = {
+ def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++
- nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results
+ val completeResults = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ authorizedTopicErrors
val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
@@ -1439,28 +1427,24 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteRecordsRequest(request: RequestChannel.Request) {
val deleteRecordsRequest = request.body[DeleteRecordsRequest]
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]()
+ val unauthorizedTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]()
+ val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]()
+ val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]()
for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
+ DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
+ DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
- authorizedForDeleteTopics += (topicPartition -> offset)
+ authorizedForDeleteTopicOffsets += (topicPartition -> offset)
}
// the callback for sending a DeleteRecordsResponse
- def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
-
- val mergedResponseStatus = responseStatus ++
- unauthorizedTopics.map(_ ->
- new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
- nonExistingTopics.map(_ ->
- new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
+ def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+ val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.error != Errors.NONE) {
debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
@@ -1475,13 +1459,13 @@ class KafkaApis(val requestChannel: RequestChannel,
new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava))
}
- if (authorizedForDeleteTopics.isEmpty)
+ if (authorizedForDeleteTopicOffsets.isEmpty)
sendResponseCallback(Map.empty)
else {
// call the replica manager to append messages to the replicas
replicaManager.deleteRecords(
deleteRecordsRequest.timeout.toLong,
- authorizedForDeleteTopics.mapValues(_.toLong),
+ authorizedForDeleteTopicOffsets,
sendResponseCallback)
}
}
@@ -1650,45 +1634,38 @@ class KafkaApis(val requestChannel: RequestChannel,
ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
val transactionalId = addPartitionsToTxnRequest.transactionalId
- val partitionsToAdd = addPartitionsToTxnRequest.partitions
+ val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
sendResponseMaybeThrottle(request, requestThrottleMs =>
addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
else {
- val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
-
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedPartitions = Set[TopicPartition]()
-
- for ( topicPartition <- partitionsToAdd.asScala) {
- if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val authorizedPartitions = mutable.Set[TopicPartition]()
+
+ for (topicPartition <- partitionsToAdd) {
+ if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
+ !authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
else
- authorizedPartitions += topicPartition
+ authorizedPartitions.add(topicPartition)
}
- if (unauthorizedTopics.nonEmpty
- || nonExistingTopics.nonEmpty
- || internalTopics.nonEmpty) {
-
+ if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) {
// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
// the authorization check to indicate that they were not added to the transaction.
- val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
- internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
- authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap
-
+ val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++
+ authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
sendResponseMaybeThrottle(request, requestThrottleMs =>
new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
} else {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
- partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava)
+ partitionsToAdd.map{tp => (tp, error)}.toMap.asJava)
trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
responseBody
}
@@ -1699,7 +1676,7 @@ class KafkaApis(val requestChannel: RequestChannel,
txnCoordinator.handleAddPartitionsToTransaction(transactionalId,
addPartitionsToTxnRequest.producerId,
addPartitionsToTxnRequest.producerEpoch,
- partitionsToAdd.asScala.toSet,
+ authorizedPartitions,
sendResponseCallback)
}
}
@@ -1749,25 +1726,22 @@ class KafkaApis(val requestChannel: RequestChannel,
else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else {
- var unauthorizedTopics = Set[TopicPartition]()
- var nonExistingTopics = Set[TopicPartition]()
- var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
+ val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
+ val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
- unauthorizedTopics += topicPartition
+ unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition.topic))
- nonExistingTopics += topicPartition
+ nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
else
- authorizedTopics += (topicPartition -> commitedOffset)
+ authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)
}
// the callback for sending an offset commit response
- def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
- val combinedCommitStatus = commitStatus ++
- unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-
+ def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]) {
+ val combinedCommitStatus = authorizedTopicErrors ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
if (error != Errors.NONE) {
@@ -1779,10 +1753,10 @@ class KafkaApis(val requestChannel: RequestChannel,
new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
}
- if (authorizedTopics.isEmpty)
+ if (authorizedTopicCommittedOffsets.isEmpty)
sendResponseCallback(Map.empty)
else {
- val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap)
+ val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap)
groupCoordinator.handleTxnCommitOffsets(
txnOffsetCommitRequest.consumerGroupId,
txnOffsetCommitRequest.producerId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0b5d43e..1df0916 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -39,7 +39,6 @@ import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
-import kafka.utils.Json._
import scala.collection._
import scala.collection.JavaConverters._
@@ -494,7 +493,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
/**
* Create an ephemeral node with the given path and data. Create parents if necessary.
*/
- private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
+ private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = {
val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
try {
zkPath.createEphemeral(path, data, acl)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1027ff3c/core/src/main/scala/kafka/utils/json/JsonValue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/json/JsonValue.scala b/core/src/main/scala/kafka/utils/json/JsonValue.scala
index 2be1880..cbc82c0 100644
--- a/core/src/main/scala/kafka/utils/json/JsonValue.scala
+++ b/core/src/main/scala/kafka/utils/json/JsonValue.scala
@@ -17,8 +17,6 @@
package kafka.utils.json
-import scala.collection._
-
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}