You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/03/03 20:46:27 UTC
[1/3] kafka git commit: KAFKA-1852;
Reject offset commits to unknown topics; reviewed by Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/trunk 1cd6ed9e2 -> 57d38f672
KAFKA-1852; Reject offset commits to unknown topics; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/616987d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/616987d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/616987d1
Branch: refs/heads/trunk
Commit: 616987d196b654486a1261f4eed50e48560e3041
Parents: 1cd6ed9
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue Mar 3 11:16:38 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:16:38 2015 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++++++-----------
.../main/scala/kafka/server/KafkaServer.scala | 8 +++++--
.../main/scala/kafka/server/MetadataCache.scala | 7 ++++++-
.../main/scala/kafka/server/OffsetManager.scala | 19 ++++++++++++-----
.../unit/kafka/server/OffsetCommitTest.scala | 19 +++++++++++++++++
5 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 703886a..35af98f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,10 +45,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val controller: KafkaController,
val zkClient: ZkClient,
val brokerId: Int,
- val config: KafkaConfig) extends Logging {
+ val config: KafkaConfig,
+ val metadataCache: MetadataCache) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
- val metadataCache = new MetadataCache(brokerId)
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -149,7 +149,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
-
// call offset manager to store offsets
offsetManager.storeOffsets(
offsetCommitRequest.groupId,
@@ -273,7 +272,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
- else
+ else
allOffsets
}
}
@@ -297,19 +296,19 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = OffsetResponse(offsetRequest.correlationId, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
-
+
def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
logManager.getLog(topicAndPartition) match {
- case Some(log) =>
+ case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
- case None =>
+ case None =>
if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
Seq(0L)
else
Nil
}
}
-
+
private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val segsArray = log.logSegments.toArray
var offsetTimeArray: Array[(Long, Long)] = null
@@ -454,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
import JavaConversions._
val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
-
+
// the callback for sending a join-group response
def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) {
val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
@@ -472,7 +471,7 @@ class KafkaApis(val requestChannel: RequestChannel,
joinGroupRequest.body.strategy(),
sendResponseCallback)
}
-
+
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
@@ -489,11 +488,10 @@ class KafkaApis(val requestChannel: RequestChannel,
heartbeatRequest.body.groupGenerationId(),
sendResponseCallback)
}
-
+
def close() {
// TODO currently closing the API is an no-op since the API no longer maintain any modules
// maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
debug("Shut down complete.")
}
}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 426e522..8e3def9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -68,6 +68,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var kafkaHealthcheck: KafkaHealthcheck = null
+ val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
+
+
var zkClient: ZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
@@ -142,7 +145,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
consumerCoordinator.startup()
/* start processing requests */
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config)
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator,
+ kafkaController, zkClient, config.brokerId, config, metadataCache)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
@@ -402,7 +406,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
- new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
+ new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4c70aa7..6aef6e4 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -136,6 +136,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
+ def contains(topic: String): Boolean = {
+ inReadLock(partitionMetadataLock) {
+ cache.contains(topic)
+ }
+ }
+
private def removePartitionInfo(topic: String, partitionId: Int) = {
cache.get(topic) match {
case Some(infos) => {
@@ -149,4 +155,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index c602a80..d2d5962 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -86,7 +86,8 @@ object OffsetManagerConfig {
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
- scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+ scheduler: Scheduler,
+ metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup {
/* offsets and metadata cache */
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -164,6 +165,7 @@ class OffsetManager(val config: OffsetManagerConfig,
debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
}
+
def offsetsTopicConfig: Properties = {
val props = new Properties
props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
@@ -214,11 +216,16 @@ class OffsetManager(val config: OffsetManagerConfig,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+ // check if there are any non-existent topics
+ val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) =>
+ !metadataCache.contains(topicAndPartition.topic)
+ }
- // first filter out partitions with offset metadata size exceeding limit
+ // first filter out partitions with offset metadata size exceeding limit or
+ // if its a non existing topic
// TODO: in the future we may want to only support atomic commit and hence fail the whole commit
val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
- validateOffsetMetadataLength(offsetAndMetadata.metadata)
+ validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition)
}
// construct the message set to append
@@ -242,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig,
.format(responseStatus, offsetTopicPartition))
// construct the commit response status and insert
- // the offset and metadata to cache iff the append status has no error
+ // the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
val responseCode =
@@ -267,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig,
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+ if (nonExistentTopics.contains(topicAndPartition))
+ (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+ else if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicAndPartition, responseCode)
else
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index a2bb885..a37a74d 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -206,4 +206,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
}
+
+ @Test
+ def testNonExistingTopicOffsetCommit() {
+ val topic1 = "topicDoesNotExists"
+ val topic2 = "topic-2"
+
+ createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1)
+
+ // Commit an offset
+ val expectedReplicaAssignment = Map(0 -> List(1))
+ val commitRequest = OffsetCommitRequest(group, immutable.Map(
+ TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
+ TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)
+ ))
+ val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+ }
}
[2/3] kafka git commit: KAFKA-1499;
trivial follow-up (remove unnecessary parentheses)
Posted by jj...@apache.org.
KAFKA-1499; trivial follow-up (remove unnecessary parentheses)
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5d654ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5d654ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5d654ac
Branch: refs/heads/trunk
Commit: c5d654acb2097eabb1784dcc88145e111a3d037b
Parents: 616987d
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Mar 3 11:18:07 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:18:07 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/message/CompressionCodec.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5d654ac/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index c4aa8ce..4d7ce17 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -43,7 +43,7 @@ object BrokerCompressionCodec {
val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
- def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains((compressionType.toLowerCase()))
+ def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase())
def getCompressionCodec(compressionType: String): CompressionCodec = {
compressionType.toLowerCase match {
@@ -94,4 +94,4 @@ case object UncompressedCodec extends BrokerCompressionCodec {
case object ProducerCompressionCodec extends BrokerCompressionCodec {
val name = "producer"
-}
\ No newline at end of file
+}
[3/3] kafka git commit: KAFKA-1986;
Request failure rate should not include invalid message size and
offset out of range; reviewed by Joel Koshy
Posted by jj...@apache.org.
KAFKA-1986; Request failure rate should not include invalid message size and offset out of range; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57d38f67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57d38f67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57d38f67
Branch: refs/heads/trunk
Commit: 57d38f672bcb85fdb20d8ca3fab9bd60d1bc8965
Parents: c5d654a
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Mar 3 11:21:04 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:21:04 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/57d38f67/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 586cf4c..c527482 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -374,6 +374,8 @@ class ReplicaManager(val config: KafkaConfig,
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl)))
case mstl: MessageSetSizeTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl)))
+ case imse : InvalidMessageSizeException =>
+ (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -483,6 +485,8 @@ class ReplicaManager(val config: KafkaConfig,
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle))
case rnae: ReplicaNotAvailableException =>
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae))
+ case oor : OffsetOutOfRangeException =>
+ LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(oor))
case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()