You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/19 23:53:03 UTC

kafka git commit: KAFKA-1910; Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman

Repository: kafka
Updated Branches:
  refs/heads/trunk 82789e751 -> b2c833aa4


KAFKA-1910; Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2c833aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2c833aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2c833aa

Branch: refs/heads/trunk
Commit: b2c833aa41cb9a7a6232781b273402042e021607
Parents: 82789e7
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 19 15:52:54 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 19 15:52:54 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java   | 13 ++++++-------
 .../kafka/clients/consumer/internals/Fetcher.java      |  2 +-
 .../org/apache/kafka/common/record/MemoryRecords.java  |  6 ++++--
 .../java/org/apache/kafka/common/record/Record.java    |  8 +++++---
 4 files changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2e24653..c7bc56c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -354,7 +354,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Metrics metrics;
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
-    private final Heartbeat heartbeat;
     private final long retryBackoffMs;
     private final boolean autoCommit;
     private final long autoCommitIntervalMs;
@@ -446,7 +445,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         else
             this.rebalanceCallback = callback;
         this.time = new SystemTime();
-        this.heartbeat = new Heartbeat(config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), time.milliseconds());
         this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
         this.lastCommitAttemptMs = time.milliseconds();
@@ -538,7 +536,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized void subscribe(String... topics) {
         ensureNotClosed();
-        log.debug("Subscribed to topic(s): ", Utils.join(topics, ", "));
+        log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
         for (String topic : topics)
             this.subscriptions.subscribe(topic);
         metadata.addTopics(topics);
@@ -555,7 +553,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     @Override
     public synchronized void subscribe(TopicPartition... partitions) {
         ensureNotClosed();
-        log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", "));
+        log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
         for (TopicPartition tp : partitions) {
             this.subscriptions.subscribe(tp);
             metadata.addTopics(tp.topic());
@@ -570,7 +568,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     public synchronized void unsubscribe(String... topics) {
         ensureNotClosed();
-        log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", "));
+        log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
         // throw an exception if the topic was never subscribed to
         for (String topic : topics)
             this.subscriptions.unsubscribe(topic);
@@ -584,7 +582,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     public synchronized void unsubscribe(TopicPartition... partitions) {
         ensureNotClosed();
-        log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", "));
+        log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
         // throw an exception if the partition was never subscribed to
         for (TopicPartition partition : partitions)
             this.subscriptions.unsubscribe(partition);
@@ -878,7 +876,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // if the committed position is unknown reset the position
                     fetcher.resetOffset(tp);
                 } else {
-                    log.debug("Resetting offset for partition {} to committed offset");
+                    log.debug("Resetting offset for partition {} to the committed offset {}",
+                        tp, subscriptions.committed(tp));
                     subscriptions.seek(tp, subscriptions.committed(tp));
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8b71fba..ef9dd52 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -258,7 +258,7 @@ public class Fetcher<K, V> {
             Node node = cluster.leaderFor(partition);
             // if there is a leader and no in-flight requests, issue a new fetch
             if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
-                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+                Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
                 if (fetch == null) {
                     fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
                     fetchable.put(node.id(), fetch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c049bff..b2db240 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -234,7 +234,7 @@ public class MemoryRecords implements Records {
                         rec.limit(size);
                     } else {
                         byte[] recordBuffer = new byte[size];
-                        stream.read(recordBuffer, 0, size);
+                        stream.readFully(recordBuffer, 0, size);
                         rec = ByteBuffer.wrap(recordBuffer);
                     }
                     LogEntry entry = new LogEntry(offset, new Record(rec));
@@ -245,7 +245,9 @@ public class MemoryRecords implements Records {
                         return entry;
                     } else {
                         // init the inner iterator with the value payload of the message,
-                        // which will de-compress the payload to a set of messages
+                        // which will de-compress the payload to a set of messages;
+                        // since we assume nested compression is not allowed, the deep iterator
+                        // would not try to further decompress underlying messages
                         ByteBuffer value = entry.record().value();
                         innerIter = new RecordsIterator(value, compression, true);
                         return innerIter.next();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 10df9fd..197d60e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -317,12 +317,14 @@ public final class Record {
     }
 
     public String toString() {
-        return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
+        return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
                              magic(),
                              attributes(),
+                             compressionType(),
                              checksum(),
-                             key().limit(),
-                             value().limit());
+                             key() == null ? 0 : key().limit(),
+                             value() == null ? 0: value().limit());
+
     }
 
     public boolean equals(Object other) {