You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/19 23:53:03 UTC
kafka git commit: KAFKA-1910;
Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman
Repository: kafka
Updated Branches:
refs/heads/trunk 82789e751 -> b2c833aa4
KAFKA-1910; Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2c833aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2c833aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2c833aa
Branch: refs/heads/trunk
Commit: b2c833aa41cb9a7a6232781b273402042e021607
Parents: 82789e7
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 19 15:52:54 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 19 15:52:54 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/KafkaConsumer.java | 13 ++++++-------
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../org/apache/kafka/common/record/MemoryRecords.java | 6 ++++--
.../java/org/apache/kafka/common/record/Record.java | 8 +++++---
4 files changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2e24653..c7bc56c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -354,7 +354,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
- private final Heartbeat heartbeat;
private final long retryBackoffMs;
private final boolean autoCommit;
private final long autoCommitIntervalMs;
@@ -446,7 +445,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
else
this.rebalanceCallback = callback;
this.time = new SystemTime();
- this.heartbeat = new Heartbeat(config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), time.milliseconds());
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
this.lastCommitAttemptMs = time.milliseconds();
@@ -538,7 +536,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized void subscribe(String... topics) {
ensureNotClosed();
- log.debug("Subscribed to topic(s): ", Utils.join(topics, ", "));
+ log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
for (String topic : topics)
this.subscriptions.subscribe(topic);
metadata.addTopics(topics);
@@ -555,7 +553,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized void subscribe(TopicPartition... partitions) {
ensureNotClosed();
- log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", "));
+ log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
for (TopicPartition tp : partitions) {
this.subscriptions.subscribe(tp);
metadata.addTopics(tp.topic());
@@ -570,7 +568,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
public synchronized void unsubscribe(String... topics) {
ensureNotClosed();
- log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", "));
+ log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
// throw an exception if the topic was never subscribed to
for (String topic : topics)
this.subscriptions.unsubscribe(topic);
@@ -584,7 +582,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
public synchronized void unsubscribe(TopicPartition... partitions) {
ensureNotClosed();
- log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", "));
+ log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
// throw an exception if the partition was never subscribed to
for (TopicPartition partition : partitions)
this.subscriptions.unsubscribe(partition);
@@ -878,7 +876,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// if the committed position is unknown reset the position
fetcher.resetOffset(tp);
} else {
- log.debug("Resetting offset for partition {} to committed offset");
+ log.debug("Resetting offset for partition {} to the committed offset {}",
+ tp, subscriptions.committed(tp));
subscriptions.seek(tp, subscriptions.committed(tp));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8b71fba..ef9dd52 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -258,7 +258,7 @@ public class Fetcher<K, V> {
Node node = cluster.leaderFor(partition);
// if there is a leader and no in-flight requests, issue a new fetch
if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
- Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
+ Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
fetchable.put(node.id(), fetch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c049bff..b2db240 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -234,7 +234,7 @@ public class MemoryRecords implements Records {
rec.limit(size);
} else {
byte[] recordBuffer = new byte[size];
- stream.read(recordBuffer, 0, size);
+ stream.readFully(recordBuffer, 0, size);
rec = ByteBuffer.wrap(recordBuffer);
}
LogEntry entry = new LogEntry(offset, new Record(rec));
@@ -245,7 +245,9 @@ public class MemoryRecords implements Records {
return entry;
} else {
// init the inner iterator with the value payload of the message,
- // which will de-compress the payload to a set of messages
+ // which will de-compress the payload to a set of messages;
+ // since we assume nested compression is not allowed, the deep iterator
+ // would not try to further decompress underlying messages
ByteBuffer value = entry.record().value();
innerIter = new RecordsIterator(value, compression, true);
return innerIter.next();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2c833aa/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 10df9fd..197d60e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -317,12 +317,14 @@ public final class Record {
}
public String toString() {
- return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
+ return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
magic(),
attributes(),
+ compressionType(),
checksum(),
- key().limit(),
- value().limit());
+ key() == null ? 0 : key().limit(),
+ value() == null ? 0: value().limit());
+
}
public boolean equals(Object other) {