You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/30 21:39:32 UTC
kafka git commit: MINOR: Ensure streaming iterator is closed by
Fetcher
Repository: kafka
Updated Branches:
refs/heads/trunk f9772d5fb -> dd71e4a8d
MINOR: Ensure streaming iterator is closed by Fetcher
Author: Jason Gustafson <ja...@confluent.io>
Author: Ismael Juma <gi...@juma.me.uk>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2762 from hachikuji/ensure-decompression-stream-closed
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd71e4a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd71e4a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd71e4a8
Branch: refs/heads/trunk
Commit: dd71e4a8d830c9de40b5ec3f987f60a1d2f26b39
Parents: f9772d5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Mar 30 22:39:28 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Mar 30 22:39:28 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 1 +
.../clients/consumer/internals/Fetcher.java | 40 +-
.../record/AbstractLegacyRecordBatch.java | 46 +-
.../kafka/common/record/DefaultRecord.java | 41 ++
.../kafka/common/record/DefaultRecordBatch.java | 45 +-
.../kafka/common/record/FileLogInputStream.java | 7 +
.../kafka/common/record/LegacyRecord.java | 4 +-
.../apache/kafka/common/record/RecordBatch.java | 11 +
.../kafka/common/record/SimpleRecord.java | 8 +
.../kafka/common/utils/CloseableIterator.java | 30 ++
.../clients/consumer/internals/FetcherTest.java | 446 ++++++++++++-------
.../common/record/DefaultRecordBatchTest.java | 14 +
.../java/org/apache/kafka/test/TestUtils.java | 5 +
13 files changed, 516 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a666540..15434bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1567,6 +1567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
firstException.compareAndSet(null, t);
log.error("Failed to close coordinator", t);
}
+ ClientUtils.closeQuietly(fetcher, "fetcher", firstException);
ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ad63d25..2eeef11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,8 +45,8 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@@ -55,11 +55,13 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -78,7 +80,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* This class manage the fetching process with the brokers.
*/
-public class Fetcher<K, V> implements SubscriptionState.Listener {
+public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
@@ -748,7 +750,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
- PartitionRecords parsedRecords = null;
+ PartitionRecords partitionRecords = null;
Errors error = partition.error;
try {
@@ -769,7 +771,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
partition.records.sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
- parsedRecords = new PartitionRecords(tp, completedFetch, batches);
+ partitionRecords = new PartitionRecords(tp, completedFetch, batches);
if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
@@ -819,15 +821,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
- if (error != Errors.NONE) {
+ if (partitionRecords == null)
completedFetch.metricAggregator.record(tp, 0, 0);
+
+ if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
- }
}
- return parsedRecords;
+ return partitionRecords;
}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
@@ -866,7 +869,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
private int recordsRead;
private int bytesRead;
private RecordBatch currentBatch;
- private Iterator<Record> records;
+ private CloseableIterator<Record> records;
private long nextFetchOffset;
private boolean isFetched = false;
@@ -881,6 +884,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
private void drain() {
if (!isFetched) {
+ maybeCloseRecordStream();
+
this.isFetched = true;
this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
@@ -913,16 +918,23 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
}
}
+ private void maybeCloseRecordStream() {
+ if (records != null)
+ records.close();
+ }
+
private Record nextFetchedRecord() {
while (true) {
if (records == null || !records.hasNext()) {
+ maybeCloseRecordStream();
+
if (!batches.hasNext()) {
drain();
return null;
}
currentBatch = batches.next();
maybeEnsureValid(currentBatch);
- records = currentBatch.iterator();
+ records = currentBatch.streamingIterator();
}
Record record = records.next();
@@ -939,12 +951,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
}
}
- private List<ConsumerRecord<K, V>> fetchRecords(int n) {
+ private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
if (isFetched)
return Collections.emptyList();
List<ConsumerRecord<K, V>> records = new ArrayList<>();
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < maxRecords; i++) {
Record record = nextFetchedRecord();
if (record == null)
break;
@@ -1173,4 +1185,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
}
}
+ @Override
+ public void close() {
+ if (nextInLineRecords != null)
+ nextInLineRecords.drain();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6deeb52..1b74a7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
@@ -29,8 +30,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.NoSuchElementException;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
@@ -155,7 +155,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
@Override
public String toString() {
- return "LegacyRecordBatch(" + offset() + ", " + outerRecord() + ")";
+ return "LegacyRecordBatch(offset=" + offset() + ", " + outerRecord() + ")";
}
@Override
@@ -211,11 +211,40 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
* @return An iterator over the records contained within this batch
*/
@Override
- public Iterator<Record> iterator() {
+ public CloseableIterator<Record> iterator() {
if (isCompressed())
return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
- else
- return Collections.<Record>singletonList(this).iterator();
+
+ return new CloseableIterator<Record>() {
+ private boolean hasNext = true;
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public Record next() {
+ if (!hasNext)
+ throw new NoSuchElementException();
+ hasNext = false;
+ return AbstractLegacyRecordBatch.this;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public CloseableIterator<Record> streamingIterator() {
+ // the older message format versions do not support streaming, so we return the normal iterator
+ return iterator();
}
static void writeHeader(ByteBuffer buffer, long offset, int size) {
@@ -256,7 +285,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
}
- private static class DeepRecordsIterator extends AbstractIterator<Record> {
+ private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
private final ArrayDeque<AbstractLegacyRecordBatch> batches;
private final long absoluteBaseOffset;
private final byte wrapperMagic;
@@ -341,6 +370,9 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return entry;
}
+
+ @Override
+ public void close() {}
}
private static class BasicLegacyRecordBatch extends AbstractLegacyRecordBatch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 7c8dee5..a4b1d11 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -26,6 +26,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.zip.Checksum;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
@@ -292,6 +293,46 @@ public class DefaultRecord implements Record {
return (attributes & CONTROL_FLAG_MASK) != 0;
}
+ @Override
+ public String toString() {
+ return String.format("DefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)",
+ offset,
+ timestamp,
+ key == null ? 0 : key.limit(),
+ value == null ? 0 : value.limit());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DefaultRecord that = (DefaultRecord) o;
+ return sizeInBytes == that.sizeInBytes &&
+ attributes == that.attributes &&
+ offset == that.offset &&
+ timestamp == that.timestamp &&
+ sequence == that.sequence &&
+ (key == null ? that.key == null : key.equals(that.key)) &&
+ (value == null ? that.value == null : value.equals(that.value)) &&
+ Arrays.equals(headers, that.headers);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = sizeInBytes;
+ result = 31 * result + (int) attributes;
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ result = 31 * result + sequence;
+ result = 31 * result + (key != null ? key.hashCode() : 0);
+ result = 31 * result + (value != null ? value.hashCode() : 0);
+ result = 31 * result + Arrays.hashCode(headers);
+ return result;
+ }
+
public static DefaultRecord readFrom(DataInputStream input,
long baseOffset,
long baseTimestamp,
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 665ddd1..3eeea36 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -19,12 +19,16 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Crc32C;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -199,7 +203,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}
- private Iterator<Record> compressedIterator() {
+ private CloseableIterator<Record> compressedIterator() {
ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
@@ -214,10 +218,19 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
throw new KafkaException("Failed to decompress record stream", e);
}
}
+
+ @Override
+ public void close() {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new KafkaException("Failed to close record stream", e);
+ }
+ }
};
}
- private Iterator<Record> uncompressedIterator() {
+ private CloseableIterator<Record> uncompressedIterator() {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@@ -225,11 +238,30 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
+ @Override
+ public void close() {}
};
}
@Override
public Iterator<Record> iterator() {
+ if (!isCompressed())
+ return uncompressedIterator();
+
+ // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
+ // so we decompress the full record set here. Use cases which call for a lower memory footprint
+ // can use `streamingIterator` at the cost of additional complexity
+ try (CloseableIterator<Record> iterator = compressedIterator()) {
+ List<Record> records = new ArrayList<>(count());
+ while (iterator.hasNext())
+ records.add(iterator.next());
+ return records.iterator();
+ }
+ }
+
+
+ @Override
+ public CloseableIterator<Record> streamingIterator() {
if (isCompressed())
return compressedIterator();
else
@@ -348,7 +380,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override
public String toString() {
- return "RecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+ return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
+ "compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
}
public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
@@ -396,7 +429,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
- private abstract class RecordIterator implements Iterator<Record> {
+ private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
private final long baseTimestamp;
@@ -423,6 +456,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override
public Record next() {
+ if (readRecords >= numRecords)
+ throw new NoSuchElementException();
+
readRecords++;
return readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
@@ -433,5 +469,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
public void remove() {
throw new UnsupportedOperationException();
}
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 59055ed..d5f10dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
@@ -224,6 +225,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
@Override
+ public CloseableIterator<Record> streamingIterator() {
+ loadUnderlyingRecordBatch();
+ return underlying.streamingIterator();
+ }
+
+ @Override
public boolean isValid() {
loadUnderlyingRecordBatch();
return underlying.isValid();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index 69ee4c3..25185b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -279,7 +279,7 @@ public final class LegacyRecord {
public String toString() {
if (magic() > 0)
- return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes)",
+ return String.format("Record(magic=%d, attributes=%d, compression=%s, crc=%d, %s=%d, key=%d bytes, value=%d bytes)",
magic(),
attributes(),
compressionType(),
@@ -289,7 +289,7 @@ public final class LegacyRecord {
key() == null ? 0 : key().limit(),
value() == null ? 0 : value().limit());
else
- return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
+ return String.format("Record(magic=%d, attributes=%d, compression=%s, crc=%d, key=%d bytes, value=%d bytes)",
magic(),
attributes(),
compressionType(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 1cfb7f8..90f1486 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.CloseableIterator;
+
import java.nio.ByteBuffer;
+import java.util.Iterator;
/**
* A record batch is a container for records. In old versions of the record format (versions 0 and 1),
@@ -200,4 +203,12 @@ public interface RecordBatch extends Iterable<Record> {
*/
int partitionLeaderEpoch();
+ /**
+ * Return a streaming iterator which basically delays decompression of the record stream until the records
+ * are actually asked for using {@link Iterator#next()}. If the message format does not support streaming
+ * iteration, then the normal iterator is returned. Either way, callers should ensure that the iterator is closed.
+ *
+ * @return The closeable iterator
+ */
+ CloseableIterator<Record> streamingIterator();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
index 3a1c04c..0a5cbcf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
@@ -106,4 +106,12 @@ public class SimpleRecord {
result = 31 * result + Arrays.hashCode(headers);
return result;
}
+
+ @Override
+ public String toString() {
+ return String.format("SimpleRecord(timestamp=%d, key=%d bytes, value=%d bytes)",
+ timestamp(),
+ key == null ? 0 : key.limit(),
+ value == null ? 0 : value.limit());
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
new file mode 100644
index 0000000..38fba8e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+/**
+ * Iterators that need to be closed in order to release resources should implement this interface.
+ *
+ * Warning: before implementing this interface, consider if there are better options. The chance of misuse is
+ * a bit high since people are used to iterating without closing.
+ */
+public interface CloseableIterator<T> extends Iterator<T>, Closeable {
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b03b461..092f549 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -88,7 +90,8 @@ public class FetcherTest {
private String topicName = "test";
private String groupId = "test-group";
private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
- private TopicPartition tp = new TopicPartition(topicName, 0);
+ private TopicPartition tp1 = new TopicPartition(topicName, 0);
+ private TopicPartition tp2 = new TopicPartition(topicName, 1);
private int minBytes = 1;
private int maxBytes = Integer.MAX_VALUE;
private int maxWaitMs = 0;
@@ -97,7 +100,7 @@ public class FetcherTest {
private MockTime time = new MockTime(1);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private MockClient client = new MockClient(time, metadata);
- private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+ private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
private Node node = cluster.nodes().get(0);
private Metrics metrics = new Metrics(time);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
@@ -132,12 +135,14 @@ public class FetcherTest {
public void teardown() {
this.metrics.close();
this.fetcherMetrics.close();
+ this.fetcher.close();
+ this.fetcherMetrics.close();
}
@Test
public void testFetchNormal() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
@@ -148,11 +153,11 @@ public class FetcherTest {
assertTrue(fetcher.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
- assertTrue(partitionRecords.containsKey(tp));
+ assertTrue(partitionRecords.containsKey(tp1));
- List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
+ List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
assertEquals(3, records.size());
- assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position
+ assertEquals(4L, subscriptions.position(tp1).longValue()); // this is the next fetching position
long offset = 1;
for (ConsumerRecord<byte[], byte[]> record : records) {
assertEquals(offset, record.offset());
@@ -162,8 +167,8 @@ public class FetcherTest {
@Test
public void testFetcherIgnoresControlRecords() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
@@ -187,19 +192,19 @@ public class FetcherTest {
assertTrue(fetcher.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
- assertTrue(partitionRecords.containsKey(tp));
+ assertTrue(partitionRecords.containsKey(tp1));
- List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp);
+ List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
assertEquals(2, records.size());
- assertEquals(4L, subscriptions.position(tp).longValue());
+ assertEquals(4L, subscriptions.position(tp1).longValue());
for (ConsumerRecord<byte[], byte[]> record : records)
assertArrayEquals("key".getBytes(), record.key());
}
@Test
public void testFetchError() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
@@ -209,7 +214,7 @@ public class FetcherTest {
assertTrue(fetcher.hasCompletedFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
- assertFalse(partitionRecords.containsKey(tp));
+ assertFalse(partitionRecords.containsKey(tp1));
}
private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
@@ -239,10 +244,10 @@ public class FetcherTest {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 1);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 1);
- client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+ client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
@@ -251,7 +256,7 @@ public class FetcherTest {
fail("fetchedRecords should have raised");
} catch (SerializationException e) {
// the position should not advance since no data has been returned
- assertEquals(1, subscriptions.position(tp).longValue());
+ assertEquals(1, subscriptions.position(tp1).longValue());
}
}
@@ -282,8 +287,8 @@ public class FetcherTest {
buffer.flip();
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
@@ -294,7 +299,7 @@ public class FetcherTest {
fail("fetchedRecords should have raised");
} catch (KafkaException e) {
// the position should not advance since no data has been returned
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
}
@@ -310,8 +315,8 @@ public class FetcherTest {
// flip some bits to fail the crc
buffer.putInt(32, buffer.get(32) ^ 87238423);
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
@@ -322,7 +327,7 @@ public class FetcherTest {
fail("fetchedRecords should have raised");
} catch (KafkaException e) {
// the position should not advance since no data has been returned
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
}
@@ -331,32 +336,32 @@ public class FetcherTest {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
List<ConsumerRecord<byte[], byte[]>> records;
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 1);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 1);
- client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
- client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
+ client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0));
+ client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0));
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
- records = fetcher.fetchedRecords().get(tp);
+ records = fetcher.fetchedRecords().get(tp1);
assertEquals(2, records.size());
- assertEquals(3L, subscriptions.position(tp).longValue());
+ assertEquals(3L, subscriptions.position(tp1).longValue());
assertEquals(1, records.get(0).offset());
assertEquals(2, records.get(1).offset());
assertEquals(0, fetcher.sendFetches());
consumerClient.poll(0);
- records = fetcher.fetchedRecords().get(tp);
+ records = fetcher.fetchedRecords().get(tp1);
assertEquals(1, records.size());
- assertEquals(4L, subscriptions.position(tp).longValue());
+ assertEquals(4L, subscriptions.position(tp1).longValue());
assertEquals(3, records.get(0).offset());
assertTrue(fetcher.sendFetches() > 0);
consumerClient.poll(0);
- records = fetcher.fetchedRecords().get(tp);
+ records = fetcher.fetchedRecords().get(tp1);
assertEquals(2, records.size());
- assertEquals(6L, subscriptions.position(tp).longValue());
+ assertEquals(6L, subscriptions.position(tp1).longValue());
assertEquals(4, records.get(0).offset());
assertEquals(5, records.get(1).offset());
}
@@ -374,16 +379,16 @@ public class FetcherTest {
MemoryRecords records = builder.build();
List<ConsumerRecord<byte[], byte[]>> consumerRecords;
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0));
consumerClient.poll(0);
- consumerRecords = fetcher.fetchedRecords().get(tp);
+ consumerRecords = fetcher.fetchedRecords().get(tp1);
assertEquals(3, consumerRecords.size());
- assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position
+ assertEquals(31L, subscriptions.position(tp1).longValue()); // this is the next fetching position
assertEquals(15L, consumerRecords.get(0).offset());
assertEquals(20L, consumerRecords.get(1).offset());
@@ -406,7 +411,7 @@ public class FetcherTest {
} catch (RecordTooLargeException e) {
assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
// the position should not advance since no data has been returned
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
} finally {
client.setNodeApiVersions(NodeApiVersions.create());
@@ -428,13 +433,13 @@ public class FetcherTest {
} catch (KafkaException e) {
assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
// the position should not advance since no data has been returned
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
}
private void makeFetchRequestWithIncompleteRecord() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
MemoryRecords partialRecord = MemoryRecords.readableRecords(
@@ -446,8 +451,8 @@ public class FetcherTest {
@Test
public void testUnauthorizedTopic() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// resize the limit of the buffer to pretend it is only fetch-size large
assertEquals(1, fetcher.sendFetches());
@@ -464,13 +469,13 @@ public class FetcherTest {
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(singleton(topicName), listener);
- subscriptions.assignFromSubscribed(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromSubscribed(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
// Now the rebalance happens and fetch positions are cleared
- subscriptions.assignFromSubscribed(singleton(tp));
+ subscriptions.assignFromSubscribed(singleton(tp1));
client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
consumerClient.poll(0);
@@ -480,31 +485,31 @@ public class FetcherTest {
@Test
public void testInFlightFetchOnPausedPartition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
- subscriptions.pause(tp);
+ subscriptions.pause(tp1);
client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
consumerClient.poll(0);
- assertNull(fetcher.fetchedRecords().get(tp));
+ assertNull(fetcher.fetchedRecords().get(tp1));
}
@Test
public void testFetchOnPausedPartition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
- subscriptions.pause(tp);
+ subscriptions.pause(tp1);
assertFalse(fetcher.sendFetches() > 0);
assertTrue(client.requests().isEmpty());
}
@Test
public void testFetchNotLeaderForPartition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
@@ -515,8 +520,8 @@ public class FetcherTest {
@Test
public void testFetchUnknownTopicOrPartition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
@@ -527,61 +532,61 @@ public class FetcherTest {
@Test
public void testFetchOffsetOutOfRange() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
- assertTrue(subscriptions.isOffsetResetNeeded(tp));
- assertEquals(null, subscriptions.position(tp));
+ assertTrue(subscriptions.isOffsetResetNeeded(tp1));
+ assertEquals(null, subscriptions.position(tp1));
}
@Test
public void testStaleOutOfRangeError() {
// verify that an out of range error which arrives after a seek
// does not cause us to reset our position or throw an exception
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
- subscriptions.seek(tp, 1);
+ subscriptions.seek(tp1, 1);
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertEquals(1, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertEquals(1, subscriptions.position(tp1).longValue());
}
@Test
public void testFetchedRecordsAfterSeek() {
- subscriptionsNoAutoReset.assignFromUser(singleton(tp));
- subscriptionsNoAutoReset.seek(tp, 0);
+ subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
+ subscriptionsNoAutoReset.seek(tp1, 0);
assertTrue(fetcherNoAutoReset.sendFetches() > 0);
client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
consumerClient.poll(0);
- assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
- subscriptionsNoAutoReset.seek(tp, 2);
+ assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
+ subscriptionsNoAutoReset.seek(tp1, 2);
assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
}
@Test
public void testFetchOffsetOutOfRangeException() {
- subscriptionsNoAutoReset.assignFromUser(singleton(tp));
- subscriptionsNoAutoReset.seek(tp, 0);
+ subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
+ subscriptionsNoAutoReset.seek(tp1, 0);
fetcherNoAutoReset.sendFetches();
client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
consumerClient.poll(0);
- assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
+ assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
try {
fetcherNoAutoReset.fetchedRecords();
fail("Should have thrown OffsetOutOfRangeException");
} catch (OffsetOutOfRangeException e) {
- assertTrue(e.offsetOutOfRangePartitions().containsKey(tp));
+ assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
assertEquals(e.offsetOutOfRangePartitions().size(), 1);
}
assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
@@ -589,8 +594,8 @@ public class FetcherTest {
@Test
public void testFetchDisconnected() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true);
@@ -598,66 +603,66 @@ public class FetcherTest {
assertEquals(0, fetcher.fetchedRecords().size());
// disconnects should have no affect on subscription state
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.committed(tp, new OffsetAndMetadata(5));
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.committed(tp1, new OffsetAndMetadata(5));
- fetcher.updateFetchPositions(singleton(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, subscriptions.position(tp).longValue());
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionResetToDefaultOffset() {
- subscriptions.assignFromUser(singleton(tp));
+ subscriptions.assignFromUser(singleton(tp1));
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 5L));
- fetcher.updateFetchPositions(singleton(tp));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, subscriptions.position(tp).longValue());
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 5L));
- fetcher.updateFetchPositions(singleton(tp));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, subscriptions.position(tp).longValue());
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 5L));
- fetcher.updateFetchPositions(singleton(tp));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, subscriptions.position(tp).longValue());
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionDisconnect() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
// First request gets a disconnect
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
@@ -666,72 +671,72 @@ public class FetcherTest {
// Next one succeeds
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 5L));
- fetcher.updateFetchPositions(singleton(tp));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertTrue(subscriptions.isFetchable(tp));
- assertEquals(5, subscriptions.position(tp).longValue());
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.committed(tp, new OffsetAndMetadata(0));
- subscriptions.pause(tp); // paused partition does not have a valid position
- subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.committed(tp1, new OffsetAndMetadata(0));
+ subscriptions.pause(tp1); // paused partition does not have a valid position
+ subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 10L));
- fetcher.updateFetchPositions(singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp1));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
- assertTrue(subscriptions.hasValidPosition(tp));
- assertEquals(10, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+ assertTrue(subscriptions.hasValidPosition(tp1));
+ assertEquals(10, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.pause(tp); // paused partition does not have a valid position
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.pause(tp1); // paused partition does not have a valid position
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
listOffsetResponse(Errors.NONE, 1L, 0L));
- fetcher.updateFetchPositions(singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp1));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
- assertTrue(subscriptions.hasValidPosition(tp));
- assertEquals(0, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+ assertTrue(subscriptions.hasValidPosition(tp1));
+ assertEquals(0, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.committed(tp, new OffsetAndMetadata(0));
- subscriptions.pause(tp); // paused partition does not have a valid position
- subscriptions.seek(tp, 10);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.committed(tp1, new OffsetAndMetadata(0));
+ subscriptions.pause(tp1); // paused partition does not have a valid position
+ subscriptions.seek(tp1, 10);
- fetcher.updateFetchPositions(singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp1));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
- assertTrue(subscriptions.hasValidPosition(tp));
- assertEquals(10, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+ assertTrue(subscriptions.hasValidPosition(tp1));
+ assertEquals(10, subscriptions.position(tp1).longValue());
}
@Test
public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.committed(tp, new OffsetAndMetadata(0));
- subscriptions.seek(tp, 10);
- subscriptions.pause(tp); // paused partition already has a valid position
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.committed(tp1, new OffsetAndMetadata(0));
+ subscriptions.seek(tp1, 10);
+ subscriptions.pause(tp1); // paused partition already has a valid position
- fetcher.updateFetchPositions(singleton(tp));
+ fetcher.updateFetchPositions(singleton(tp1));
- assertFalse(subscriptions.isOffsetResetNeeded(tp));
- assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
- assertTrue(subscriptions.hasValidPosition(tp));
- assertEquals(10, subscriptions.position(tp).longValue());
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
+ assertTrue(subscriptions.hasValidPosition(tp1));
+ assertEquals(10, subscriptions.position(tp1).longValue());
}
@Test
@@ -803,8 +808,8 @@ public class FetcherTest {
*/
@Test
public void testQuotaMetrics() throws Exception {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
// normal fetch
for (int i = 1; i < 4; i++) {
@@ -813,14 +818,14 @@ public class FetcherTest {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
for (int v = 0; v < 3; v++)
- builder.appendWithOffset((long) i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
- List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp);
+ builder.appendWithOffset(i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp1);
assertEquals(3, records.size());
}
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
- KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
- KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+ KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup));
+ KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup));
assertEquals(200, avgMetric.value(), EPSILON);
assertEquals(300, maxMetric.value(), EPSILON);
}
@@ -830,11 +835,11 @@ public class FetcherTest {
*/
@Test
public void testFetcherMetrics() {
- subscriptions.assignFromUser(singleton(tp));
- subscriptions.seek(tp, 0);
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
- MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup, "");
- MetricName partitionLagMetric = metrics.metricName(tp + ".records-lag", metricGroup, "");
+ MetricName maxLagMetric = metrics.metricName("records-lag-max", metricGroup);
+ MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -853,7 +858,7 @@ public class FetcherTest {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
for (int v = 0; v < 3; v++)
- builder.appendWithOffset((long) v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
fetchRecords(builder.build(), Errors.NONE, 200L, 0);
assertEquals(197, recordsFetchLagMax.value(), EPSILON);
@@ -862,6 +867,131 @@ public class FetcherTest {
assertFalse(allMetrics.containsKey(partitionLagMetric));
}
+ @Test
+ public void testFetchResponseMetrics() {
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ for (int v = 0; v < 3; v++)
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ MemoryRecords records = builder.build();
+
+ int expectedBytes = 0;
+ for (Record record : records.records())
+ expectedBytes += record.sizeInBytes();
+
+ fetchRecords(records, Errors.NONE, 100L, 0);
+ assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+ assertEquals(3, recordsCountAverage.value(), EPSILON);
+ }
+
+ @Test
+ public void testFetchResponseMetricsPartialResponse() {
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 1);
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ for (int v = 0; v < 3; v++)
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ MemoryRecords records = builder.build();
+
+ int expectedBytes = 0;
+ for (Record record : records.records()) {
+ if (record.offset() >= 1)
+ expectedBytes += record.sizeInBytes();
+ }
+
+ fetchRecords(records, Errors.NONE, 100L, 0);
+ assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+ assertEquals(2, recordsCountAverage.value(), EPSILON);
+ }
+
+ @Test
+ public void testFetchResponseMetricsWithOnePartitionError() {
+ subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+ subscriptions.seek(tp1, 0);
+ subscriptions.seek(tp2, 0);
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ for (int v = 0; v < 3; v++)
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ MemoryRecords records = builder.build();
+
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+ partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, MemoryRecords.EMPTY));
+
+ assertEquals(1, fetcher.sendFetches());
+ client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+ consumerClient.poll(0);
+ fetcher.fetchedRecords();
+
+ int expectedBytes = 0;
+ for (Record record : records.records())
+ expectedBytes += record.sizeInBytes();
+
+ assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+ assertEquals(3, recordsCountAverage.value(), EPSILON);
+ }
+
+ @Test
+ public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
+ subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+ subscriptions.seek(tp1, 0);
+ subscriptions.seek(tp2, 0);
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricName("fetch-size-avg", metricGroup));
+ KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricName("records-per-request-avg", metricGroup));
+
+ // send the fetch and then seek to a new offset
+ assertEquals(1, fetcher.sendFetches());
+ subscriptions.seek(tp2, 5);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ for (int v = 0; v < 3; v++)
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ MemoryRecords records = builder.build();
+
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+ partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
+ MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes()))));
+
+ client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+ consumerClient.poll(0);
+ fetcher.fetchedRecords();
+
+ // we should have ignored the record at the wrong offset
+ int expectedBytes = 0;
+ for (Record record : records.records())
+ expectedBytes += record.sizeInBytes();
+
+ assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
+ assertEquals(3, recordsCountAverage.value(), EPSILON);
+ }
+
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) {
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(records, error, hw, throttleTime));
@@ -905,7 +1035,7 @@ public class FetcherTest {
Long expectedOffsetForTp0,
Long expectedOffsetForTp1) {
client.reset();
- TopicPartition tp0 = tp;
+ TopicPartition tp0 = tp1;
TopicPartition tp1 = new TopicPartition(topicName, 1);
// Ensure metadata has both partition.
Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
@@ -944,13 +1074,13 @@ public class FetcherTest {
@Override
public boolean matches(AbstractRequest body) {
ListOffsetRequest req = (ListOffsetRequest) body;
- return timestamp == req.partitionTimestamps().get(tp);
+ return timestamp == req.partitionTimestamps().get(tp1);
}
};
}
private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
- return listOffsetResponse(tp, error, timestamp, offset);
+ return listOffsetResponse(tp1, error, timestamp, offset);
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
@@ -961,7 +1091,7 @@ public class FetcherTest {
}
private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
- Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1,
new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index b02c5c9..8466c83 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -207,4 +208,17 @@ public class DefaultRecordBatchTest {
assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
}
+ @Test
+ public void testStreamingIteratorConsistency() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+ try (CloseableIterator<Record> streamingIterator = batch.streamingIterator()) {
+ TestUtils.checkEquals(streamingIterator, batch.iterator());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd71e4a8/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ea857a0..1019282 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -33,6 +33,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -302,6 +303,10 @@ public class TestUtils {
assertEquals(toList(it1), toList(it2));
}
+ public static <T> void checkEquals(Iterator<T> it1, Iterator<T> it2) {
+ assertEquals(Utils.toList(it1), Utils.toList(it2));
+ }
+
public static <T> List<T> toList(Iterable<? extends T> iterable) {
List<T> list = new ArrayList<>();
for (T item : iterable)