You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/07 01:09:10 UTC
git commit: MirrorMaker with shallow.iterator.enable=true produces
unreadble messages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732
Updated Branches:
refs/heads/0.8 eae1bd52e -> 771760ce2
MirrorMaker with shallow.iterator.enable=true produces unreadble messages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/771760ce
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/771760ce
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/771760ce
Branch: refs/heads/0.8
Commit: 771760ce23f00ba86b916420d8e209b2611b23c0
Parents: eae1bd5
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Mar 6 16:08:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 6 16:08:56 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/consumer/ConsumerConfig.scala | 6 ------
.../scala/kafka/consumer/ConsumerIterator.scala | 7 +------
.../main/scala/kafka/consumer/KafkaStream.scala | 3 +--
.../consumer/ZookeeperConsumerConnector.scala | 3 +--
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 1 -
5 files changed, 3 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 45db07b..2ebd72a 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -125,12 +125,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
- /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
- * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
- * overhead of decompression.
- * */
- val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
-
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 746a4bd..a504534 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean,
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
@@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
.format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
- localCurrent =
- if (enableShallowIterator)
- currentDataChunk.messages.shallowIterator
- else
- currentDataChunk.messages.iterator
+ localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index d4e0e96..31eaf86 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
- val enableShallowIterator: Boolean,
val clientId: String)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[K,V] =
- new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
+ new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId)
/**
* Create an iterator over messages in the stream.
http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b266f3f..dcbcf21 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
- queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
+ queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList
@@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
- config.shallowIteratorEnable,
config.clientId)
(queue, stream)
}).toList
http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 8ae30ea..1ee34b9 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
consumerConfig.consumerTimeoutMs,
new StringDecoder(),
new StringDecoder(),
- enableShallowIterator = false,
clientId = "")
val receivedMessages = (0 until 5).map(i => iter.next.message).toList