You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/07 01:09:10 UTC

git commit: MirrorMaker with shallow.iterator.enable=true produces unreadble messages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732

Updated Branches:
  refs/heads/0.8 eae1bd52e -> 771760ce2


MirrorMaker with shallow.iterator.enable=true produces unreadble messages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/771760ce
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/771760ce
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/771760ce

Branch: refs/heads/0.8
Commit: 771760ce23f00ba86b916420d8e209b2611b23c0
Parents: eae1bd5
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Mar 6 16:08:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 6 16:08:56 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    6 ------
 .../scala/kafka/consumer/ConsumerIterator.scala    |    7 +------
 .../main/scala/kafka/consumer/KafkaStream.scala    |    3 +--
 .../consumer/ZookeeperConsumerConnector.scala      |    3 +--
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    1 -
 5 files changed, 3 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 45db07b..2ebd72a 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -125,12 +125,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
 
-  /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
-   *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
-   *  overhead of decompression.
-   *  */
-  val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
-
   /**
    * Client id is specified by the kafka consumer client, used to distinguish different clients
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 746a4bd..a504534 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val enableShallowIterator: Boolean,
                              val clientId: String)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
@@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
             .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
-        localCurrent =
-          if (enableShallowIterator)
-            currentDataChunk.messages.shallowIterator
-          else
-            currentDataChunk.messages.iterator
+        localCurrent = currentDataChunk.messages.iterator
 
         current.set(localCurrent)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index d4e0e96..31eaf86 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val enableShallowIterator: Boolean,
                         val clientId: String)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId)
 
   /**
    *  Create an iterator over messages in the stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b266f3f..dcbcf21 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
         (queue, stream)
       })
     ).flatten.toList
@@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.shallowIteratorEnable,
                                           config.clientId)
         (queue, stream)
     }).toList

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 8ae30ea..1ee34b9 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
                                                     consumerConfig.consumerTimeoutMs,
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
-                                                    enableShallowIterator = false,
                                                     clientId = "")
     val receivedMessages = (0 until 5).map(i => iter.next.message).toList