You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/11/08 07:19:55 UTC

svn commit: r1406940 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/server/ test/scala/unit/kafka/consumer/ test/scala/unit/kafka/log/ test/scala/unit/kafka/server/

Author: junrao
Date: Thu Nov  8 06:19:55 2012
New Revision: 1406940

URL: http://svn.apache.org/viewvc?rev=1406940&view=rev
Log:
Fix commit() in zk consumer for compressed messages; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-546

Added:
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Nov  8 06:19:55 2012
@@ -73,13 +73,19 @@ class ConsumerIterator[T](private val ch
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
-        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+        val cdcFetchOffset = currentDataChunk.fetchOffset
+        val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
+        if (ctiConsumeOffset < cdcFetchOffset) {
           error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
-                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
-          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+            .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
+          currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
-        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
-                       else currentDataChunk.messages.iterator
+        localCurrent =
+          if (enableShallowIterator)
+            currentDataChunk.messages.shallowIterator
+          else
+            currentDataChunk.messages.iterator
+
         current.set(localCurrent)
       }
       // if we just updated the current chunk and it is empty that means the fetch size is too small!
@@ -88,9 +94,13 @@ class ConsumerIterator[T](private val ch
                                                "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
                                                .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
     }
-    val item = localCurrent.next()
+    var item = localCurrent.next()
+    // reject the messages that have already been consumed
+    while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
+      item = localCurrent.next()
+    }
     consumedOffset = item.nextOffset
-    
+
     item.message.ensureValid() // validate checksum of message to ensure it is valid
 
     new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Nov  8 06:19:55 2012
@@ -22,7 +22,7 @@ import kafka.utils._
 import scala.collection._
 import kafka.log.Log._
 import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig}
+import kafka.server.KafkaConfig
 
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Thu Nov  8 06:19:55 2012
@@ -57,7 +57,7 @@ class LogSegment(val messageSet: FileMes
       trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes()))
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        index.append(offset, messageSet.sizeInBytes().toInt)
+        index.append(offset, messageSet.sizeInBytes())
         this.bytesSinceLastIndexEntry = 0
       }
       // append the messages
@@ -102,7 +102,7 @@ class LogSegment(val messageSet: FileMes
           val mapping = translateOffset(offset)
           val endPosition = 
             if(mapping == null)
-              messageSet.sizeInBytes().toInt // the max offset is off the end of the log, use the end of the file
+              messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
             else
               mapping.position
           min(endPosition - startPosition.position, maxSize) 
@@ -133,7 +133,7 @@ class LogSegment(val messageSet: FileMes
    * Not that this is expensive.
    */
   def nextOffset(): Long = {
-    val ms = read(index.lastOffset, messageSet.sizeInBytes.toInt, None)
+    val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
     ms.lastOption match {
       case None => start
       case Some(last) => last.nextOffset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Nov  8 06:19:55 2012
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.io.File
 import kafka.network.SocketServer
 import kafka.log.LogManager
 import kafka.utils._

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala?rev=1406940&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala Thu Nov  8 06:19:55 2012
@@ -0,0 +1,88 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import junit.framework.Assert._
+
+import kafka.message._
+import kafka.server._
+import kafka.utils.TestUtils._
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.admin.CreateTopicCommand
+import org.junit.Test
+import kafka.serializer.DefaultDecoder
+import kafka.cluster.{Broker, Cluster}
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.KafkaServerTestHarness
+
+class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
+
+  val numNodes = 1
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val zkConnect = TestZKUtils.zookeeperConnect
+    }
+  val messages = new mutable.HashMap[Int, Seq[Message]]
+  val topic = "topic"
+  val group = "group1"
+  val consumer0 = "consumer0"
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+  val queue = new LinkedBlockingQueue[FetchedDataChunk]
+  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+                                                           c.brokerId,
+                                                           0,
+                                                           queue,
+                                                           new AtomicLong(5),
+                                                           new AtomicLong(0),
+                                                           new AtomicInteger(0)))
+  val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+
+  override def setUp() {
+    super.setUp
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+  }
+
+  @Test
+  def testConsumerIteratorDeduplicationDeepIterator() {
+    val messages = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
+    val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
+
+    topicInfos(0).enqueue(messageSet)
+    assertEquals(1, queue.size)
+    queue.put(ZookeeperConsumerConnector.shutdownCommand)
+
+    val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs,
+                                                                        new DefaultDecoder, false)
+    var receivedMessages: List[Message] = Nil
+    for (i <- 0 until 5) {
+      assertTrue(iter.hasNext)
+      receivedMessages ::= iter.next.message
+    }
+
+    assertTrue(!iter.hasNext)
+    assertEquals(1, queue.size) // This is only the shutdown command.
+    assertEquals(5, receivedMessages.size)
+    assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages.takeRight(5).sortWith((s,t) => s.checksum < t.checksum))
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Thu Nov  8 06:19:55 2012
@@ -18,10 +18,8 @@
 package kafka.log
 
 import java.io._
-import java.nio.channels.OverlappingFileLockException
 import junit.framework.Assert._
 import org.junit.Test
-import kafka.common.OffsetOutOfRangeException
 import org.scalatest.junit.JUnit3Suite
 import kafka.server.KafkaConfig
 import kafka.common._

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Thu Nov  8 06:19:55 2012
@@ -16,7 +16,7 @@
 */
 package kafka.server
 
-import kafka.log.{Log, LogManager}
+import kafka.log.LogManager
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import org.easymock.EasyMock