You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/11/08 07:19:55 UTC
svn commit: r1406940 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/server/
test/scala/unit/kafka/consumer/ test/scala/unit/kafka/log/
test/scala/unit/kafka/server/
Author: junrao
Date: Thu Nov 8 06:19:55 2012
New Revision: 1406940
URL: http://svn.apache.org/viewvc?rev=1406940&view=rev
Log:
Fix commit() in zk consumer for compressed messages; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-546
Added:
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Nov 8 06:19:55 2012
@@ -73,13 +73,19 @@ class ConsumerIterator[T](private val ch
return allDone
} else {
currentTopicInfo = currentDataChunk.topicInfo
- if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+ val cdcFetchOffset = currentDataChunk.fetchOffset
+ val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
+ if (ctiConsumeOffset < cdcFetchOffset) {
error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
- .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
- currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+ .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
+ currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
- localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
- else currentDataChunk.messages.iterator
+ localCurrent =
+ if (enableShallowIterator)
+ currentDataChunk.messages.shallowIterator
+ else
+ currentDataChunk.messages.iterator
+
current.set(localCurrent)
}
// if we just updated the current chunk and it is empty that means the fetch size is too small!
@@ -88,9 +94,13 @@ class ConsumerIterator[T](private val ch
"%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
.format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
- val item = localCurrent.next()
+ var item = localCurrent.next()
+ // reject the messages that have already been consumed
+ while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
+ item = localCurrent.next()
+ }
consumedOffset = item.nextOffset
-
+
item.message.ensureValid() // validate checksum of message to ensure it is valid
new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Nov 8 06:19:55 2012
@@ -22,7 +22,7 @@ import kafka.utils._
import scala.collection._
import kafka.log.Log._
import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig}
+import kafka.server.KafkaConfig
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Thu Nov 8 06:19:55 2012
@@ -57,7 +57,7 @@ class LogSegment(val messageSet: FileMes
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
- index.append(offset, messageSet.sizeInBytes().toInt)
+ index.append(offset, messageSet.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
@@ -102,7 +102,7 @@ class LogSegment(val messageSet: FileMes
val mapping = translateOffset(offset)
val endPosition =
if(mapping == null)
- messageSet.sizeInBytes().toInt // the max offset is off the end of the log, use the end of the file
+ messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
@@ -133,7 +133,7 @@ class LogSegment(val messageSet: FileMes
* Not that this is expensive.
*/
def nextOffset(): Long = {
- val ms = read(index.lastOffset, messageSet.sizeInBytes.toInt, None)
+ val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
ms.lastOption match {
case None => start
case Some(last) => last.nextOffset
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Nov 8 06:19:55 2012
@@ -17,7 +17,6 @@
package kafka.server
-import java.io.File
import kafka.network.SocketServer
import kafka.log.LogManager
import kafka.utils._
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala?rev=1406940&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala Thu Nov 8 06:19:55 2012
@@ -0,0 +1,88 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import junit.framework.Assert._
+
+import kafka.message._
+import kafka.server._
+import kafka.utils.TestUtils._
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.admin.CreateTopicCommand
+import org.junit.Test
+import kafka.serializer.DefaultDecoder
+import kafka.cluster.{Broker, Cluster}
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.KafkaServerTestHarness
+
+class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
+
+ val numNodes = 1
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numNodes))
+ yield new KafkaConfig(props) {
+ override val zkConnect = TestZKUtils.zookeeperConnect
+ }
+ val messages = new mutable.HashMap[Int, Seq[Message]]
+ val topic = "topic"
+ val group = "group1"
+ val consumer0 = "consumer0"
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+ val queue = new LinkedBlockingQueue[FetchedDataChunk]
+ val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+ c.brokerId,
+ 0,
+ queue,
+ new AtomicLong(5),
+ new AtomicLong(0),
+ new AtomicInteger(0)))
+ val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+
+ override def setUp() {
+ super.setUp
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ }
+
+ @Test
+ def testConsumerIteratorDeduplicationDeepIterator() {
+ val messages = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
+ val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
+
+ topicInfos(0).enqueue(messageSet)
+ assertEquals(1, queue.size)
+ queue.put(ZookeeperConsumerConnector.shutdownCommand)
+
+ val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs,
+ new DefaultDecoder, false)
+ var receivedMessages: List[Message] = Nil
+ for (i <- 0 until 5) {
+ assertTrue(iter.hasNext)
+ receivedMessages ::= iter.next.message
+ }
+
+ assertTrue(!iter.hasNext)
+ assertEquals(1, queue.size) // This is only the shutdown command.
+ assertEquals(5, receivedMessages.size)
+ assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages.takeRight(5).sortWith((s,t) => s.checksum < t.checksum))
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Thu Nov 8 06:19:55 2012
@@ -18,10 +18,8 @@
package kafka.log
import java.io._
-import java.nio.channels.OverlappingFileLockException
import junit.framework.Assert._
import org.junit.Test
-import kafka.common.OffsetOutOfRangeException
import org.scalatest.junit.JUnit3Suite
import kafka.server.KafkaConfig
import kafka.common._
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1406940&r1=1406939&r2=1406940&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Thu Nov 8 06:19:55 2012
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.log.{Log, LogManager}
+import kafka.log.LogManager
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock