You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:34 UTC
[7/9] kafka git commit: KAFKA-4390;
Replace MessageSet usage with client-side alternatives
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 1bc8a65..4a678d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
@@ -25,51 +26,58 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
+import java.util.Iterator;
+/**
+ * An iterator which handles both the shallow and deep iteration of record sets.
+ */
public class RecordsIterator extends AbstractIterator<LogEntry> {
- private final LogInputStream logStream;
private final boolean shallow;
+ private final boolean ensureMatchingMagic;
+ private final int masRecordSize;
+ private final ShallowRecordsIterator<?> shallowIter;
private DeepRecordsIterator innerIter;
- public RecordsIterator(LogInputStream logStream, boolean shallow) {
- this.logStream = logStream;
+ public RecordsIterator(LogInputStream<?> logInputStream,
+ boolean shallow,
+ boolean ensureMatchingMagic,
+ int masRecordSize) {
+ this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
this.shallow = shallow;
+ this.ensureMatchingMagic = ensureMatchingMagic;
+ this.masRecordSize = masRecordSize;
}
- /*
- * Read the next record from the buffer.
- *
- * Note that in the compressed message set, each message value size is set as the size of the un-compressed
- * version of the message value, so when we do de-compression allocating an array of the specified size for
- * reading compressed value data is sufficient.
+ /**
+ * Get a shallow iterator over the given input stream.
+ * @param logInputStream The log input stream to read the entries from
+ * @param <T> The type of the log entry
+ * @return The shallow iterator.
*/
+ public static <T extends LogEntry> Iterator<T> shallowIterator(LogInputStream<T> logInputStream) {
+ return new ShallowRecordsIterator<>(logInputStream);
+ }
+
@Override
protected LogEntry makeNext() {
if (innerDone()) {
- try {
- LogEntry entry = logStream.nextEntry();
- // No more record to return.
- if (entry == null)
- return allDone();
-
- // decide whether to go shallow or deep iteration if it is compressed
- CompressionType compressionType = entry.record().compressionType();
- if (compressionType == CompressionType.NONE || shallow) {
- return entry;
- } else {
- // init the inner iterator with the value payload of the message,
- // which will de-compress the payload to a set of messages;
- // since we assume nested compression is not allowed, the deep iterator
- // would not try to further decompress underlying messages
- // There will be at least one element in the inner iterator, so we don't
- // need to call hasNext() here.
- innerIter = new DeepRecordsIterator(entry);
- return innerIter.next();
- }
- } catch (EOFException e) {
+ if (!shallowIter.hasNext())
return allDone();
- } catch (IOException e) {
- throw new KafkaException(e);
+
+ LogEntry entry = shallowIter.next();
+
+ // decide whether to go shallow or deep iteration if it is compressed
+ if (shallow || !entry.isCompressed()) {
+ return entry;
+ } else {
+ // init the inner iterator with the value payload of the message,
+ // which will de-compress the payload to a set of messages;
+ // since we assume nested compression is not allowed, the deep iterator
+ // would not try to further decompress underlying messages
+ // There will be at least one element in the inner iterator, so we don't
+ // need to call hasNext() here.
+ innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, masRecordSize);
+ return innerIter.next();
}
} else {
return innerIter.next();
@@ -80,38 +88,70 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
return innerIter == null || !innerIter.hasNext();
}
- private static class DataLogInputStream implements LogInputStream {
+ private static class DataLogInputStream implements LogInputStream<LogEntry> {
private final DataInputStream stream;
+ protected final int maxMessageSize;
- private DataLogInputStream(DataInputStream stream) {
+ DataLogInputStream(DataInputStream stream, int maxMessageSize) {
this.stream = stream;
+ this.maxMessageSize = maxMessageSize;
}
public LogEntry nextEntry() throws IOException {
- long offset = stream.readLong();
- int size = stream.readInt();
- if (size < 0)
- throw new IllegalStateException("Record with size " + size);
-
- byte[] recordBuffer = new byte[size];
- stream.readFully(recordBuffer, 0, size);
- ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
- return new LogEntry(offset, new Record(buf));
+ try {
+ long offset = stream.readLong();
+ int size = stream.readInt();
+ if (size < Record.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+ if (size > maxMessageSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+ byte[] recordBuffer = new byte[size];
+ stream.readFully(recordBuffer, 0, size);
+ ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+ return LogEntry.create(offset, new Record(buf));
+ } catch (EOFException e) {
+ return null;
+ }
}
}
- private static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
+ private static class ShallowRecordsIterator<T extends LogEntry> extends AbstractIterator<T> {
+ private final LogInputStream<T> logStream;
+
+ public ShallowRecordsIterator(LogInputStream<T> logStream) {
+ this.logStream = logStream;
+ }
+
+ @Override
+ protected T makeNext() {
+ try {
+ T entry = logStream.nextEntry();
+ if (entry == null)
+ return allDone();
+ return entry;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+ }
+
+ public static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
private final ArrayDeque<LogEntry> logEntries;
private final long absoluteBaseOffset;
+ private final byte wrapperMagic;
+
+ public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+ Record wrapperRecord = wrapperEntry.record();
+ this.wrapperMagic = wrapperRecord.magic();
- private DeepRecordsIterator(LogEntry entry) {
- CompressionType compressionType = entry.record().compressionType();
- ByteBuffer buffer = entry.record().value();
- DataInputStream stream = Compressor.wrapForInput(new ByteBufferInputStream(buffer), compressionType, entry.record().magic());
- LogInputStream logStream = new DataLogInputStream(stream);
+ CompressionType compressionType = wrapperRecord.compressionType();
+ ByteBuffer buffer = wrapperRecord.value();
+ DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic());
+ LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
- long wrapperRecordOffset = entry.offset();
- long wrapperRecordTimestamp = entry.record().timestamp();
+ long wrapperRecordOffset = wrapperEntry.offset();
+ long wrapperRecordTimestamp = wrapperRecord.timestamp();
this.logEntries = new ArrayDeque<>();
// If relative offset is used, we need to decompress the entire message first to compute
@@ -119,22 +159,27 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
// do the same for message format version 0
try {
while (true) {
- try {
- LogEntry logEntry = logStream.nextEntry();
- if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
- Record recordWithTimestamp = new Record(
- logEntry.record().buffer(),
- wrapperRecordTimestamp,
- entry.record().timestampType()
- );
- logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
- }
- logEntries.add(logEntry);
- } catch (EOFException e) {
+ LogEntry logEntry = logStream.nextEntry();
+ if (logEntry == null)
break;
+
+ Record record = logEntry.record();
+ byte magic = record.magic();
+
+ if (ensureMatchingMagic && magic != wrapperMagic)
+ throw new InvalidRecordException("Compressed message magic does not match wrapper magic");
+
+ if (magic > Record.MAGIC_VALUE_V0) {
+ Record recordWithTimestamp = new Record(
+ record.buffer(),
+ wrapperRecordTimestamp,
+ wrapperRecord.timestampType()
+ );
+ logEntry = LogEntry.create(logEntry.offset(), recordWithTimestamp);
}
+ logEntries.addLast(logEntry);
}
- if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+ if (wrapperMagic > Record.MAGIC_VALUE_V0)
this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
else
this.absoluteBaseOffset = -1;
@@ -155,12 +200,10 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
// Convert offset to absolute offset if needed.
if (absoluteBaseOffset >= 0) {
long absoluteOffset = absoluteBaseOffset + entry.offset();
- entry = new LogEntry(absoluteOffset, entry.record());
+ entry = LogEntry.create(absoluteOffset, entry.record());
}
- // decide whether to go shallow or deep iteration if it is compressed
- CompressionType compression = entry.record().compressionType();
- if (compression != CompressionType.NONE)
+ if (entry.isCompressed())
throw new InvalidRecordException("Inner messages must not be compressed");
return entry;
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index 62fd814..55c966a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -27,6 +27,7 @@ public enum TimestampType {
public final int id;
public final String name;
+
TimestampType(int id, String name) {
this.id = id;
this.name = name;
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c3c1045..c5e6716 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -266,6 +266,24 @@ public class Utils {
}
/**
+ * Convert a ByteBuffer to a nullable array.
+ * @param buffer The buffer to convert
+ * @return The resulting array or null if the buffer is null
+ */
+ public static byte[] toNullableArray(ByteBuffer buffer) {
+ return buffer == null ? null : toArray(buffer);
+ }
+
+ /**
+ * Wrap an array as a nullable ByteBuffer.
+ * @param array The nullable array to wrap
+ * @return The wrapping ByteBuffer or null if array is null
+ */
+ public static ByteBuffer wrapNullable(byte[] array) {
+ return array == null ? null : ByteBuffer.wrap(array);
+ }
+
+ /**
* Read a byte array from the given offset and size in the buffer
*/
public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
@@ -733,4 +751,37 @@ public class Utils {
public static int longHashcode(long value) {
return (int) (value ^ (value >>> 32));
}
+
+ /**
+ * Read a size-delimited byte buffer starting at the given offset.
+ * @param buffer Buffer containing the size and data
+ * @param start Offset in the buffer to read from
+ * @return A slice of the buffer containing only the delimited data (excluding the size)
+ */
+ public static ByteBuffer sizeDelimited(ByteBuffer buffer, int start) {
+ int size = buffer.getInt(start);
+ if (size < 0) {
+ return null;
+ } else {
+ ByteBuffer b = buffer.duplicate();
+ b.position(start + 4);
+ b = b.slice();
+ b.limit(size);
+ b.rewind();
+ return b;
+ }
+ }
+
+ /**
+ * Compute the checksum of a range of data
+ * @param buffer Buffer containing the data to checksum
+ * @param start Offset in the buffer to read from
+ * @param size The number of bytes to include
+ */
+ public static long computeChecksum(ByteBuffer buffer, int start, int size) {
+ Crc32 crc = new Crc32();
+ crc.update(buffer.array(), buffer.arrayOffset() + start, size);
+ return crc.getValue();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ad6c127..a4386f8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
@@ -1323,11 +1325,10 @@ public class KafkaConsumerTest {
TopicPartition partition = fetchEntry.getKey();
long fetchOffset = fetchEntry.getValue().offset;
int fetchCount = fetchEntry.getValue().count;
- MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
for (int i = 0; i < fetchCount; i++)
records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
- records.close();
- tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records));
+ tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build()));
}
return new FetchResponse(tpResponses, 0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6d5896f..15075cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -37,10 +37,12 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ByteBufferOutputStream;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Compressor;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@@ -93,8 +95,8 @@ public class FetcherTest {
private static final double EPSILON = 0.0001;
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
- private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
- private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ private MemoryRecords records;
+ private MemoryRecords nextRecords;
private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
private Metrics fetcherMetrics = new Metrics(time);
private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -104,14 +106,16 @@ public class FetcherTest {
metadata.update(cluster, time.milliseconds());
client.setNode(node);
- records.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
- records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
- records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
- records.close();
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+ builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
+ builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
+ builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
+ records = builder.build();
- nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
- nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
- nextRecords.close();
+ builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+ builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
+ builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
+ nextRecords = builder.build();
}
@After
@@ -129,7 +133,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
@@ -154,7 +158,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
@@ -192,7 +196,7 @@ public class FetcherTest {
subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 1);
- client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
@@ -206,29 +210,30 @@ public class FetcherTest {
}
@Test
- public void testParseInvalidRecord() {
+ public void testParseInvalidRecord() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
- Compressor compressor = new Compressor(buffer, CompressionType.NONE);
+ ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+ byte magic = Record.CURRENT_MAGIC_VALUE;
byte[] key = "foo".getBytes();
byte[] value = "baz".getBytes();
long offset = 0;
long timestamp = 500L;
int size = Record.recordSize(key, value);
- long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1);
+ byte attributes = Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
+ long crc = Record.computeChecksum(magic, attributes, timestamp, key, value);
// write one valid record
- compressor.putLong(offset);
- compressor.putInt(size);
- Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+ out.writeLong(offset);
+ out.writeInt(size);
+ Record.write(out, magic, crc, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
// and one invalid record (note the crc)
- compressor.putLong(offset);
- compressor.putInt(size);
- Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
+ out.writeLong(offset);
+ out.writeInt(size);
+ Record.write(out, magic, crc + 1, Record.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME), timestamp, key, value);
- compressor.close();
buffer.flip();
subscriptions.assignFromUser(singleton(tp));
@@ -236,7 +241,7 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
try {
fetcher.fetchedRecords();
@@ -255,8 +260,8 @@ public class FetcherTest {
subscriptions.assignFromUser(singleton(tp));
subscriptions.seek(tp, 1);
- client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
- client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0));
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(0);
@@ -287,11 +292,11 @@ public class FetcherTest {
// if we are fetching from a compacted topic, there may be gaps in the returned records
// this test verifies the fetcher updates the current fetched/consumed positions correctly for this case
- MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
- records.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
- records.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
- records.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
- records.close();
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
+ builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
+ builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
+ builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
+ MemoryRecords records = builder.build();
List<ConsumerRecord<byte[], byte[]>> consumerRecords;
subscriptions.assignFromUser(singleton(tp));
@@ -299,7 +304,7 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(records, Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
consumerRecords = fetcher.fetchedRecords().get(tp);
assertEquals(3, consumerRecords.size());
@@ -317,7 +322,7 @@ public class FetcherTest {
// resize the limit of the buffer to pretend it is only fetch-size large
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
consumerClient.poll(0);
try {
fetcher.fetchedRecords();
@@ -337,7 +342,7 @@ public class FetcherTest {
// Now the rebalance happens and fetch positions are cleared
subscriptions.assignFromSubscribed(singleton(tp));
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
// The active fetch should be ignored since its position is no longer valid
@@ -352,7 +357,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
subscriptions.pause(tp);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
assertNull(fetcher.fetchedRecords().get(tp));
}
@@ -373,7 +378,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -385,7 +390,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -397,7 +402,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -412,7 +417,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
subscriptions.seek(tp, 1);
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -426,7 +431,7 @@ public class FetcherTest {
subscriptionsNoAutoReset.seek(tp, 0);
assertTrue(fetcherNoAutoReset.sendFetches() > 0);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
subscriptionsNoAutoReset.seek(tp, 2);
@@ -439,7 +444,7 @@ public class FetcherTest {
subscriptionsNoAutoReset.seek(tp, 0);
fetcherNoAutoReset.sendFetches();
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
+ client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -459,7 +464,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true);
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -611,14 +616,14 @@ public class FetcherTest {
// We need to make sure the message offset grows. Otherwise they will be considered as already consumed
// and filtered out by consumer.
if (i > 1) {
- this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME);
for (int v = 0; v < 3; v++) {
- this.records.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
+ builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
}
- this.records.close();
+ this.records = builder.build();
}
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 100 * i));
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
@@ -722,8 +727,7 @@ public class FetcherTest {
return new ListOffsetResponse(allPartitionData, 1);
}
- private FetchResponse fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) {
return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 28521e8..4f25bdf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -12,23 +12,6 @@
*/
package org.apache.kafka.clients.producer.internals;
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
@@ -45,6 +28,23 @@ import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class RecordAccumulatorTest {
private String topic = "test";
@@ -84,7 +84,7 @@ public class RecordAccumulatorTest {
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(1, partitionBatches.size());
- assertTrue(partitionBatches.peekFirst().records.isWritable());
+ assertTrue(partitionBatches.peekFirst().isWritable());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
@@ -93,15 +93,15 @@ public class RecordAccumulatorTest {
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
- assertFalse(partitionBatchesIterator.next().records.isWritable());
- assertTrue(partitionBatchesIterator.next().records.isWritable());
+ assertFalse(partitionBatchesIterator.next().isWritable());
+ assertTrue(partitionBatchesIterator.next().isWritable());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records.iterator();
+ Iterator<LogEntry> iter = batch.records().deepIterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -130,7 +130,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records.iterator();
+ Iterator<LogEntry> iter = batch.records().deepIterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -159,7 +159,7 @@ public class RecordAccumulatorTest {
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time);
- List<Thread> threads = new ArrayList<Thread>();
+ List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
public void run() {
@@ -182,8 +182,11 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
- for (LogEntry entry : batch.records)
+ Iterator<LogEntry> deepEntries = batch.records().deepIterator();
+ while (deepEntries.hasNext()) {
+ deepEntries.next();
read++;
+ }
accum.deallocate(batch);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
new file mode 100644
index 0000000..62e8a05
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ByteBufferLogInputStreamTest {
+
+ @Test
+ public void iteratorIgnoresIncompleteEntries() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+ builder.append(1L, 20L, "b".getBytes(), "2".getBytes());
+
+ ByteBuffer recordsBuffer = builder.build().buffer();
+ recordsBuffer.limit(recordsBuffer.limit() - 5);
+
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowIterator();
+ assertTrue(iterator.hasNext());
+ ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
+ assertEquals(0L, first.offset());
+
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testSetCreateTimeV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+ assertTrue(iterator.hasNext());
+ ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+ long createTimeMs = 20L;
+ entry.setCreateTime(createTimeMs);
+
+ assertEquals(TimestampType.CREATE_TIME, entry.record().timestampType());
+ assertEquals(createTimeMs, entry.record().timestamp());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetCreateTimeNotAllowedV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+ assertTrue(iterator.hasNext());
+ ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+ long createTimeMs = 20L;
+ entry.setCreateTime(createTimeMs);
+ }
+
+ @Test
+ public void testSetLogAppendTimeV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+ assertTrue(iterator.hasNext());
+ ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+ long logAppendTime = 20L;
+ entry.setLogAppendTime(logAppendTime);
+
+ assertEquals(TimestampType.LOG_APPEND_TIME, entry.record().timestampType());
+ assertEquals(logAppendTime, entry.record().timestamp());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetLogAppendTimeNotAllowedV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+ Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowIterator();
+
+ assertTrue(iterator.hasNext());
+ ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+
+ long logAppendTime = 20L;
+ entry.setLogAppendTime(logAppendTime);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
new file mode 100644
index 0000000..7e2c256
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileRecordsTest {
+
+ private Record[] records = new Record[] {
+ Record.create("abcd".getBytes()),
+ Record.create("efgh".getBytes()),
+ Record.create("ijkl".getBytes())
+ };
+ private FileRecords fileRecords;
+
+ @Before
+ public void setup() throws IOException {
+ this.fileRecords = createFileRecords(records);
+ }
+
+ /**
+ * Test that the cached size variable matches the actual file size as we append messages
+ */
+ @Test
+ public void testFileSize() throws IOException {
+ assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
+ for (int i = 0; i < 20; i++) {
+ fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes())));
+ assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
+ }
+ }
+
+ /**
+ * Test that adding invalid bytes to the end of the log doesn't break iteration
+ */
+ @Test
+ public void testIterationOverPartialAndTruncation() throws IOException {
+ testPartialWrite(0, fileRecords);
+ testPartialWrite(2, fileRecords);
+ testPartialWrite(4, fileRecords);
+ testPartialWrite(5, fileRecords);
+ testPartialWrite(6, fileRecords);
+ }
+
+ private void testPartialWrite(int size, FileRecords fileRecords) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ for (int i = 0; i < size; i++)
+ buffer.put((byte) 0);
+
+ buffer.rewind();
+
+ fileRecords.channel().write(buffer);
+
+ // appending those bytes should not change the contents
+ TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+ }
+
+ /**
+ * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
+ */
+ @Test
+ public void testIterationDoesntChangePosition() throws IOException {
+ long position = fileRecords.channel().position();
+ TestUtils.checkEquals(Arrays.asList(records).iterator(), fileRecords.records());
+ assertEquals(position, fileRecords.channel().position());
+ }
+
+ /**
+ * Test a simple append and read.
+ */
+ @Test
+ public void testRead() throws IOException {
+ FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
+ TestUtils.checkEquals(fileRecords.shallowIterator(), read.shallowIterator());
+
+ List<LogEntry> items = shallowEntries(read);
+ LogEntry second = items.get(1);
+
+ read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
+ assertEquals("Try a read starting from the second message",
+ items.subList(1, 3), shallowEntries(read));
+
+ read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
+ assertEquals("Try a read of a single message starting from the second message",
+ Collections.singletonList(second), shallowEntries(read));
+ }
+
+ /**
+ * Test the MessageSet.searchFor API.
+ */
+ @Test
+ public void testSearch() throws IOException {
+ // append a new message with a high offset
+ Record lastMessage = Record.create("test".getBytes());
+ fileRecords.append(MemoryRecords.withRecords(50L, lastMessage));
+
+ List<LogEntry> entries = shallowEntries(fileRecords);
+ int position = 0;
+
+ int message1Size = entries.get(0).sizeInBytes();
+ assertEquals("Should be able to find the first message by its offset",
+ new FileRecords.LogEntryPosition(0L, position, message1Size),
+ fileRecords.searchForOffsetWithSize(0, 0));
+ position += message1Size;
+
+ int message2Size = entries.get(1).sizeInBytes();
+ assertEquals("Should be able to find second message when starting from 0",
+ new FileRecords.LogEntryPosition(1L, position, message2Size),
+ fileRecords.searchForOffsetWithSize(1, 0));
+ assertEquals("Should be able to find second message starting from its offset",
+ new FileRecords.LogEntryPosition(1L, position, message2Size),
+ fileRecords.searchForOffsetWithSize(1, position));
+ position += message2Size + entries.get(2).sizeInBytes();
+
+ int message4Size = entries.get(3).sizeInBytes();
+ assertEquals("Should be able to find fourth message from a non-existant offset",
+ new FileRecords.LogEntryPosition(50L, position, message4Size),
+ fileRecords.searchForOffsetWithSize(3, position));
+ assertEquals("Should be able to find fourth message by correct offset",
+ new FileRecords.LogEntryPosition(50L, position, message4Size),
+ fileRecords.searchForOffsetWithSize(50, position));
+ }
+
+ /**
+ * Test that the message set iterator obeys start and end slicing
+ */
+ @Test
+ public void testIteratorWithLimits() throws IOException {
+ LogEntry entry = shallowEntries(fileRecords).get(1);
+ int start = fileRecords.searchForOffsetWithSize(1, 0).position;
+ int size = entry.sizeInBytes();
+ FileRecords slice = fileRecords.read(start, size);
+ assertEquals(Collections.singletonList(entry), shallowEntries(slice));
+ FileRecords slice2 = fileRecords.read(start, size - 1);
+ assertEquals(Collections.emptyList(), shallowEntries(slice2));
+ }
+
+ /**
+ * Test the truncateTo method lops off messages and appropriately updates the size
+ */
+ @Test
+ public void testTruncate() throws IOException {
+ LogEntry entry = shallowEntries(fileRecords).get(0);
+ int end = fileRecords.searchForOffsetWithSize(1, 0).position;
+ fileRecords.truncateTo(end);
+ assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords));
+ assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes());
+ }
+
+ /**
+ * Test that truncateTo only calls truncate on the FileChannel if the size of the
+ * FileChannel is bigger than the target size. This is important because some JVMs
+ * change the mtime of the file, even if truncate should do nothing.
+ */
+ @Test
+ public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
+ FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+ EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+ EasyMock.expect(channelMock.position(42L)).andReturn(null);
+ EasyMock.replay(channelMock);
+
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+ fileRecords.truncateTo(42);
+
+ EasyMock.verify(channelMock);
+ }
+
+ /**
+ * Expect a KafkaException if targetSize is bigger than the size of
+ * the FileRecords.
+ */
+ @Test
+ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException {
+ FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+ EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+ EasyMock.expect(channelMock.position(42L)).andReturn(null);
+ EasyMock.replay(channelMock);
+
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+
+ try {
+ fileRecords.truncateTo(43);
+ fail("Should throw KafkaException");
+ } catch (KafkaException e) {
+ // expected
+ }
+
+ EasyMock.verify(channelMock);
+ }
+
+ /**
+ * see #testTruncateNotCalledIfSizeIsSameAsTargetSize
+ */
+ @Test
+ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException {
+ FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+
+ EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
+ EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
+ EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
+ EasyMock.expect(channelMock.position(23L)).andReturn(null).once();
+ EasyMock.replay(channelMock);
+
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
+ fileRecords.truncateTo(23);
+
+ EasyMock.verify(channelMock);
+ }
+
+ /**
+ * Test the new FileRecords with pre allocate as true
+ */
+ @Test
+ public void testPreallocateTrue() throws IOException {
+ File temp = tempFile();
+ FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+ long position = fileRecords.channel().position();
+ int size = fileRecords.sizeInBytes();
+ assertEquals(0, position);
+ assertEquals(0, size);
+ assertEquals(512 * 1024 * 1024, temp.length());
+ }
+
+ /**
+ * Test the new FileRecords with pre allocate as false
+ */
+ @Test
+ public void testPreallocateFalse() throws IOException {
+ File temp = tempFile();
+ FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, false);
+ long position = set.channel().position();
+ int size = set.sizeInBytes();
+ assertEquals(0, position);
+ assertEquals(0, size);
+ assertEquals(0, temp.length());
+ }
+
+ /**
+ * Test the new FileRecords with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
+ */
+ @Test
+ public void testPreallocateClearShutdown() throws IOException {
+ File temp = tempFile();
+ FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+ set.append(MemoryRecords.withRecords(records));
+
+ int oldPosition = (int) set.channel().position();
+ int oldSize = set.sizeInBytes();
+ assertEquals(fileRecords.sizeInBytes(), oldPosition);
+ assertEquals(fileRecords.sizeInBytes(), oldSize);
+ set.close();
+
+ File tempReopen = new File(temp.getAbsolutePath());
+ FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
+ int position = (int) setReopen.channel().position();
+ int size = setReopen.sizeInBytes();
+
+ assertEquals(oldPosition, position);
+ assertEquals(oldPosition, size);
+ assertEquals(oldPosition, tempReopen.length());
+ }
+
+ @Test
+ public void testFormatConversionWithPartialMessage() throws IOException {
+ LogEntry entry = shallowEntries(fileRecords).get(1);
+ int start = fileRecords.searchForOffsetWithSize(1, 0).position;
+ int size = entry.sizeInBytes();
+ FileRecords slice = fileRecords.read(start, size - 1);
+ Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0);
+ assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
+ assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
+ }
+
+ @Test
+ public void testConvertNonCompressedToMagic1() throws IOException {
+ List<LogEntry> entries = Arrays.asList(
+ LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
+ LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
+ MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
+
+ // Up conversion. In reality we only do down conversion, but up conversion should work as well.
+ // up conversion for non-compressed messages
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ fileRecords.append(records);
+ fileRecords.flush();
+ Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+ verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
+ }
+ }
+
+ @Test
+ public void testConvertCompressedToMagic1() throws IOException {
+ List<LogEntry> entries = Arrays.asList(
+ LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
+ LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
+ MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+
+ // up conversion for compressed messages
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ fileRecords.append(records);
+ fileRecords.flush();
+ Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+ verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
+ }
+ }
+
+ @Test
+ public void testConvertNonCompressedToMagic0() throws IOException {
+ List<LogEntry> entries = Arrays.asList(
+ LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
+ LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
+ MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
+
+ // down conversion for non-compressed messages
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ fileRecords.append(records);
+ fileRecords.flush();
+ Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+ verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+ }
+ }
+
+ @Test
+ public void testConvertCompressedToMagic0() throws IOException {
+ List<LogEntry> entries = Arrays.asList(
+ LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
+ LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
+ MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+
+ // down conversion for compressed messages
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ fileRecords.append(records);
+ fileRecords.flush();
+ Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+ verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+ }
+ }
+
+ private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) {
+ int i = 0;
+ for (LogEntry logEntry : deepEntries(convertedRecords)) {
+ assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic());
+ assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset());
+ assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key());
+ assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value());
+ i += 1;
+ }
+ }
+
+ private static List<LogEntry> shallowEntries(Records buffer) {
+ List<LogEntry> entries = new ArrayList<>();
+ Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
+ while (iterator.hasNext())
+ entries.add(iterator.next());
+ return entries;
+ }
+
+ private static List<LogEntry> deepEntries(Records buffer) {
+ List<LogEntry> entries = new ArrayList<>();
+ Iterator<? extends LogEntry> iterator = buffer.shallowIterator();
+ while (iterator.hasNext()) {
+ for (LogEntry deepEntry : iterator.next())
+ entries.add(deepEntry);
+ }
+ return entries;
+ }
+
+ private FileRecords createFileRecords(Record ... records) throws IOException {
+ FileRecords fileRecords = FileRecords.open(tempFile());
+ fileRecords.append(MemoryRecords.withRecords(records));
+ fileRecords.flush();
+ return fileRecords;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
new file mode 100644
index 0000000..40fa212
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@RunWith(value = Parameterized.class)
+public class MemoryRecordsBuilderTest {
+
+ private final CompressionType compressionType;
+ private final int bufferOffset;
+
+ public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
+ this.bufferOffset = bufferOffset;
+ this.compressionType = compressionType;
+ }
+
+ @Test
+ public void testCompressionRateV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ Record[] records = new Record[] {
+ Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
+ Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
+ Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
+ };
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+
+ int uncompressedSize = 0;
+ long offset = 0L;
+ for (Record record : records) {
+ uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
+ builder.append(offset++, record);
+ }
+
+ MemoryRecords built = builder.build();
+ if (compressionType == CompressionType.NONE) {
+ assertEquals(1.0, builder.compressionRate(), 0.00001);
+ } else {
+ int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0;
+ double computedCompressionRate = (double) compressedSize / uncompressedSize;
+ assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+ }
+ }
+
+ @Test
+ public void testCompressionRateV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ Record[] records = new Record[] {
+ Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
+ Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
+ Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
+ };
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+
+ int uncompressedSize = 0;
+ long offset = 0L;
+ for (Record record : records) {
+ uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
+ builder.append(offset++, record);
+ }
+
+ MemoryRecords built = builder.build();
+ if (compressionType == CompressionType.NONE) {
+ assertEquals(1.0, builder.compressionRate(), 0.00001);
+ } else {
+ int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1;
+ double computedCompressionRate = (double) compressedSize / uncompressedSize;
+ assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
+ }
+ }
+
+ @Test
+ public void buildUsingLogAppendTime() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ long logAppendTime = System.currentTimeMillis();
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+ builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+ builder.append(1L, 0L, "b".getBytes(), "2".getBytes());
+ builder.append(2L, 0L, "c".getBytes(), "3".getBytes());
+ MemoryRecords records = builder.build();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ assertEquals(logAppendTime, info.maxTimestamp);
+
+ assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+ Iterator<Record> iterator = records.records();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
+ assertEquals(logAppendTime, record.timestamp());
+ }
+ }
+
+ @Test
+ public void convertUsingLogAppendTime() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ long logAppendTime = System.currentTimeMillis();
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+
+ builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
+ builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
+ builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
+ MemoryRecords records = builder.build();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ assertEquals(logAppendTime, info.maxTimestamp);
+
+ assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+ Iterator<Record> iterator = records.records();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
+ assertEquals(logAppendTime, record.timestamp());
+ }
+ }
+
+ @Test
+ public void buildUsingCreateTime() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ long logAppendTime = System.currentTimeMillis();
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+ builder.append(1L, 2L, "b".getBytes(), "2".getBytes());
+ builder.append(2L, 1L, "c".getBytes(), "3".getBytes());
+ MemoryRecords records = builder.build();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ assertEquals(2L, info.maxTimestamp);
+
+ if (compressionType == CompressionType.NONE)
+ assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
+ else
+ assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+ Iterator<Record> iterator = records.records();
+ int i = 0;
+ long[] expectedTimestamps = new long[] {0L, 2L, 1L};
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+ assertEquals(expectedTimestamps[i++], record.timestamp());
+ }
+ }
+
+ @Test
+ public void writePastLimit() {
+ ByteBuffer buffer = ByteBuffer.allocate(64);
+ buffer.position(bufferOffset);
+
+ long logAppendTime = System.currentTimeMillis();
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
+ builder.append(1L, 1L, "b".getBytes(), "2".getBytes());
+
+ assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
+ builder.append(2L, 2L, "c".getBytes(), "3".getBytes());
+ MemoryRecords records = builder.build();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ assertEquals(2L, info.maxTimestamp);
+ assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
+
+ Iterator<Record> iterator = records.records();
+ long i = 0L;
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+ assertEquals(i++, record.timestamp());
+ }
+ }
+
+ @Test
+ public void convertUsingCreateTime() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ buffer.position(bufferOffset);
+
+ long logAppendTime = System.currentTimeMillis();
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+
+ builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
+ builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
+ builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
+ MemoryRecords records = builder.build();
+
+ MemoryRecordsBuilder.RecordsInfo info = builder.info();
+ assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
+ assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+
+ Iterator<Record> iterator = records.records();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+ assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (int bufferOffset : Arrays.asList(0, 15))
+ for (CompressionType compressionType : CompressionType.values())
+ values.add(new Object[] {bufferOffset, compressionType});
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index b1117f1..ef0fbeb 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,53 +16,64 @@
*/
package org.apache.kafka.common.record;
-import static org.apache.kafka.common.utils.Utils.toArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.toNullableArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class MemoryRecordsTest {
private CompressionType compression;
+ private byte magic;
+ private long firstOffset;
- public MemoryRecordsTest(CompressionType compression) {
+ public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) {
+ this.magic = magic;
this.compression = compression;
+ this.firstOffset = firstOffset;
}
@Test
public void testIterator() {
- MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
- MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
- List<Record> list = Arrays.asList(new Record(0L, "a".getBytes(), "1".getBytes()),
- new Record(0L, "b".getBytes(), "2".getBytes()),
- new Record(0L, "c".getBytes(), "3".getBytes()));
+ MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
+ MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
+ List<Record> list = asList(
+ Record.create(magic, 1L, "a".getBytes(), "1".getBytes()),
+ Record.create(magic, 2L, "b".getBytes(), "2".getBytes()),
+ Record.create(magic, 3L, "c".getBytes(), "3".getBytes()),
+ Record.create(magic, 4L, null, "4".getBytes()),
+ Record.create(magic, 5L, "e".getBytes(), null),
+ Record.create(magic, 6L, null, null));
+
for (int i = 0; i < list.size(); i++) {
Record r = list.get(i);
- recs1.append(i, r);
- recs2.append(i, 0L, toArray(r.key()), toArray(r.value()));
+ builder1.append(firstOffset + i, r);
+ builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), toNullableArray(r.value()));
}
- recs1.close();
- recs2.close();
+
+ MemoryRecords recs1 = builder1.build();
+ MemoryRecords recs2 = builder2.build();
for (int iteration = 0; iteration < 2; iteration++) {
- for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
- Iterator<LogEntry> iter = recs.iterator();
+ for (MemoryRecords recs : asList(recs1, recs2)) {
+ Iterator<LogEntry> iter = recs.deepIterator();
for (int i = 0; i < list.size(); i++) {
assertTrue(iter.hasNext());
LogEntry entry = iter.next();
- assertEquals((long) i, entry.offset());
+ assertEquals(firstOffset + i, entry.offset());
assertEquals(list.get(i), entry.record());
entry.record().ensureValid();
}
@@ -73,20 +84,145 @@ public class MemoryRecordsTest {
@Test
public void testHasRoomForMethod() {
- MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
- recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes()));
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME);
+ builder.append(0, Record.create(magic, 0L, "a".getBytes(), "1".getBytes()));
+
+ assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+ builder.close();
+ assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+ }
+
+ @Test
+ public void testFilterTo() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME);
+ builder.append(0L, 10L, null, "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+ builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
+ builder.append(2L, 12L, null, "c".getBytes());
+ builder.close();
- assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
- recs1.close();
- assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
+ builder.append(3L, 13L, null, "d".getBytes());
+ builder.append(4L, 20L, "4".getBytes(), "e".getBytes());
+ builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
+ builder.append(6L, 16L, "6".getBytes(), "g".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+
+ filtered.flip();
+
+ assertEquals(7, result.messagesRead);
+ assertEquals(4, result.messagesRetained);
+ assertEquals(buffer.limit(), result.bytesRead);
+ assertEquals(filtered.limit(), result.bytesRetained);
+ if (magic > 0) {
+ assertEquals(20L, result.maxTimestamp);
+ if (compression == CompressionType.NONE)
+ assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
+ else
+ assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
+ }
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+ List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
+ assertEquals(expectedOffsets.size(), shallowEntries.size());
+
+ for (int i = 0; i < expectedOffsets.size(); i++) {
+ LogEntry shallowEntry = shallowEntries.get(i);
+ assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset());
+ assertEquals(magic, shallowEntry.record().magic());
+ assertEquals(compression, shallowEntry.record().compressionType());
+ assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME,
+ shallowEntry.record().timestampType());
+ }
+
+ List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepIterator());
+ assertEquals(4, deepEntries.size());
+
+ LogEntry first = deepEntries.get(0);
+ assertEquals(1L, first.offset());
+ assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record());
+
+ LogEntry second = deepEntries.get(1);
+ assertEquals(4L, second.offset());
+ assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record());
+
+ LogEntry third = deepEntries.get(2);
+ assertEquals(5L, third.offset());
+ assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record());
+
+ LogEntry fourth = deepEntries.get(3);
+ assertEquals(6L, fourth.offset());
+ assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record());
+ }
+
+ @Test
+ public void testFilterToPreservesLogAppendTime() {
+ long logAppendTime = System.currentTimeMillis();
+
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
+ builder.append(0L, 10L, null, "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
+ builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
+ builder.append(2L, 12L, null, "c".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
+ builder.append(3L, 13L, null, "d".getBytes());
+ builder.append(4L, 14L, "4".getBytes(), "e".getBytes());
+ builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowIterator());
+ assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
+
+ for (LogEntry shallowEntry : shallowEntries) {
+ assertEquals(compression, shallowEntry.record().compressionType());
+ if (magic > Record.MAGIC_VALUE_V0) {
+ assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType());
+ assertEquals(logAppendTime, shallowEntry.record().timestamp());
+ }
+ }
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
- List<Object[]> values = new ArrayList<Object[]>();
- for (CompressionType type: CompressionType.values())
- values.add(new Object[] {type});
+ List<Object[]> values = new ArrayList<>();
+ for (long firstOffset : asList(0L, 57L))
+ for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+ for (CompressionType type: CompressionType.values())
+ values.add(new Object[] {magic, firstOffset, type});
return values;
}
+
+ private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter {
+ @Override
+ public boolean shouldRetain(LogEntry entry) {
+ return entry.record().hasKey();
+ }
+ }
}