You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/03/03 20:46:27 UTC

[1/3] kafka git commit: KAFKA-1852; Reject offset commits to unknown topics; reviewed by Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk 1cd6ed9e2 -> 57d38f672


KAFKA-1852; Reject offset commits to unknown topics; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/616987d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/616987d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/616987d1

Branch: refs/heads/trunk
Commit: 616987d196b654486a1261f4eed50e48560e3041
Parents: 1cd6ed9
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue Mar 3 11:16:38 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:16:38 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++++++-----------
 .../main/scala/kafka/server/KafkaServer.scala   |  8 +++++--
 .../main/scala/kafka/server/MetadataCache.scala |  7 ++++++-
 .../main/scala/kafka/server/OffsetManager.scala | 19 ++++++++++++-----
 .../unit/kafka/server/OffsetCommitTest.scala    | 19 +++++++++++++++++
 5 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 703886a..35af98f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,10 +45,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val controller: KafkaController,
                 val zkClient: ZkClient,
                 val brokerId: Int,
-                val config: KafkaConfig) extends Logging {
+                val config: KafkaConfig,
+                val metadataCache: MetadataCache) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  val metadataCache = new MetadataCache(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -149,7 +149,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     }
-
     // call offset manager to store offsets
     offsetManager.storeOffsets(
       offsetCommitRequest.groupId,
@@ -273,7 +272,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val hw = localReplica.highWatermark.messageOffset
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
-            else 
+            else
               allOffsets
           }
         }
@@ -297,19 +296,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     val response = OffsetResponse(offsetRequest.correlationId, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
-  
+
   def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     logManager.getLog(topicAndPartition) match {
-      case Some(log) => 
+      case Some(log) =>
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
-      case None => 
+      case None =>
         if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
           Seq(0L)
         else
           Nil
     }
   }
-  
+
   private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
@@ -454,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     import JavaConversions._
 
     val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
-    
+
     // the callback for sending a join-group response
     def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) {
       val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
@@ -472,7 +471,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       joinGroupRequest.body.strategy(),
       sendResponseCallback)
   }
-  
+
   def handleHeartbeatRequest(request: RequestChannel.Request) {
     val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
 
@@ -489,11 +488,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       heartbeatRequest.body.groupGenerationId(),
       sendResponseCallback)
   }
-  
+
   def close() {
     // TODO currently closing the API is an no-op since the API no longer maintain any modules
     // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
     debug("Shut down complete.")
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 426e522..8e3def9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -68,6 +68,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
 
   var kafkaHealthcheck: KafkaHealthcheck = null
+  val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
+
+
 
   var zkClient: ZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
@@ -142,7 +145,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         consumerCoordinator.startup()
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config)
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator,
+          kafkaController, zkClient, config.brokerId, config, metadataCache)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
 
@@ -402,7 +406,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
+    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4c70aa7..6aef6e4 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -136,6 +136,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def contains(topic: String): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      cache.contains(topic)
+    }
+  }
+
   private def removePartitionInfo(topic: String, partitionId: Int) = {
     cache.get(topic) match {
       case Some(infos) => {
@@ -149,4 +155,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index c602a80..d2d5962 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -86,7 +86,8 @@ object OffsetManagerConfig {
 class OffsetManager(val config: OffsetManagerConfig,
                     replicaManager: ReplicaManager,
                     zkClient: ZkClient,
-                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+                    scheduler: Scheduler,
+                    metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup {
 
   /* offsets and metadata cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -164,6 +165,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
   }
 
+
   def offsetsTopicConfig: Properties = {
     val props = new Properties
     props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
@@ -214,11 +216,16 @@ class OffsetManager(val config: OffsetManagerConfig,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+    // check if there are any non-existent topics
+    val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) =>
+      !metadataCache.contains(topicAndPartition.topic)
+    }
 
-    // first filter out partitions with offset metadata size exceeding limit
+    // first filter out partitions with offset metadata size exceeding limit or
+    // if its a non existing topic
     // TODO: in the future we may want to only support atomic commit and hence fail the whole commit
     val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+      validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition)
     }
 
     // construct the message set to append
@@ -242,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig,
           .format(responseStatus, offsetTopicPartition))
 
       // construct the commit response status and insert
-      // the offset and metadata to cache iff the append status has no error
+      // the offset and metadata to cache if the append status has no error
       val status = responseStatus(offsetTopicPartition)
 
       val responseCode =
@@ -267,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig,
 
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+        if (nonExistentTopics.contains(topicAndPartition))
+          (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+        else if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
           (topicAndPartition, responseCode)
         else
           (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)

http://git-wip-us.apache.org/repos/asf/kafka/blob/616987d1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index a2bb885..a37a74d 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -206,4 +206,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
 
   }
+
+  @Test
+  def testNonExistingTopicOffsetCommit() {
+    val topic1 = "topicDoesNotExists"
+    val topic2 = "topic-2"
+
+    createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1)
+
+    // Commit an offset
+    val expectedReplicaAssignment = Map(0  -> List(1))
+    val commitRequest = OffsetCommitRequest(group, immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
+      TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)
+    ))
+    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+  }
 }


[2/3] kafka git commit: KAFKA-1499; trivial follow-up (remove unnecessary parentheses)

Posted by jj...@apache.org.
KAFKA-1499; trivial follow-up (remove unnecessary parentheses)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5d654ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5d654ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5d654ac

Branch: refs/heads/trunk
Commit: c5d654acb2097eabb1784dcc88145e111a3d037b
Parents: 616987d
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue Mar 3 11:18:07 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:18:07 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/message/CompressionCodec.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c5d654ac/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index c4aa8ce..4d7ce17 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -43,7 +43,7 @@ object BrokerCompressionCodec {
   val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
   val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
 
-  def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains((compressionType.toLowerCase()))
+  def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase())
 
   def getCompressionCodec(compressionType: String): CompressionCodec = {
     compressionType.toLowerCase match {
@@ -94,4 +94,4 @@ case object UncompressedCodec extends BrokerCompressionCodec {
 
 case object ProducerCompressionCodec extends BrokerCompressionCodec {
   val name = "producer"
-}
\ No newline at end of file
+}


[3/3] kafka git commit: KAFKA-1986; Request failure rate should not include invalid message size and offset out of range; reviewed by Joel Koshy

Posted by jj...@apache.org.
KAFKA-1986; Request failure rate should not include invalid message size and offset out of range; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57d38f67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57d38f67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57d38f67

Branch: refs/heads/trunk
Commit: 57d38f672bcb85fdb20d8ca3fab9bd60d1bc8965
Parents: c5d654a
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Mar 3 11:21:04 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:21:04 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57d38f67/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 586cf4c..c527482 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -374,6 +374,8 @@ class ReplicaManager(val config: KafkaConfig,
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl)))
           case mstl: MessageSetSizeTooLargeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl)))
+          case imse : InvalidMessageSizeException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -483,6 +485,8 @@ class ReplicaManager(val config: KafkaConfig,
             LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle))
           case rnae: ReplicaNotAvailableException =>
             LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae))
+          case oor : OffsetOutOfRangeException =>
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(oor))
           case e: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()