You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/28 17:00:03 UTC
kafka git commit: KAFKA-5316;
LogCleaner should account for larger record sets after cleaning
Repository: kafka
Updated Branches:
refs/heads/trunk b50387eb7 -> dfa3c8a92
KAFKA-5316; LogCleaner should account for larger record sets after cleaning
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #3142 from hachikuji/KAFKA-5316
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfa3c8a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfa3c8a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfa3c8a9
Branch: refs/heads/trunk
Commit: dfa3c8a92dddd58cab95e12c72669f250bb99683
Parents: b50387e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun May 28 09:57:59 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sun May 28 09:57:59 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle.xml | 2 +-
.../producer/internals/ProducerBatch.java | 57 +++------
.../record/AbstractLegacyRecordBatch.java | 6 +
.../kafka/common/record/DefaultRecordBatch.java | 14 +--
.../kafka/common/record/FileLogInputStream.java | 7 --
.../apache/kafka/common/record/FileRecords.java | 21 +---
.../kafka/common/record/MemoryRecords.java | 110 ++++++++++------
.../common/record/MemoryRecordsBuilder.java | 124 ++++++++++++-------
.../kafka/common/record/MutableRecordBatch.java | 9 ++
.../common/utils/ByteBufferOutputStream.java | 45 ++++++-
.../org/apache/kafka/clients/MockClient.java | 15 ++-
.../internals/RecordAccumulatorTest.java | 8 +-
.../clients/producer/internals/SenderTest.java | 61 ++++++---
.../common/record/FileLogInputStreamTest.java | 2 +-
.../kafka/common/record/MemoryRecordsTest.java | 70 +++++++++--
.../utils/ByteBufferOutputStreamTest.java | 101 +++++++++++++++
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 11 +-
core/src/main/scala/kafka/log/LogSegment.scala | 14 +--
.../scala/kafka/tools/DumpLogSegments.scala | 4 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 10 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 9 +-
22 files changed, 490 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 743c68d..ccab85c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
- <property name="max" value="17"/>
+ <property name="max" value="20"/>
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index df79707..974e230 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.producer.internals;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -27,23 +24,24 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
-import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
@@ -119,9 +117,9 @@ public final class ProducerBatch {
}
/**
- + * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
- + * @return true if the record has been successfully appended, false otherwise.
- + */
+ * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
+ * @return true if the record has been successfully appended, false otherwise.
+ */
private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
return false;
@@ -196,15 +194,13 @@ public final class ProducerBatch {
assert thunkIter.hasNext();
Thunk thunk = thunkIter.next();
if (batch == null) {
- batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
- record, splitBatchSize, this.createdMs);
+ batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
}
// A newly created batch can always host the first message.
if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) {
batches.add(batch);
- batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(),
- record, splitBatchSize, this.createdMs);
+ batch = createBatchOffAccumulatorForRecord(record, splitBatchSize);
batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk);
}
}
@@ -217,30 +213,13 @@ public final class ProducerBatch {
return batches;
}
- private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp,
- CompressionType compressionType,
- Record record,
- int batchSize,
- long createdMs) {
- int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(),
- record.key(),
- record.value(),
- record.headers()),
- batchSize);
- return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs);
- }
-
- // package private for testing purpose.
- static ProducerBatch createBatchOffAccumulator(TopicPartition tp,
- CompressionType compressionType,
- int batchSize,
- long createdMs) {
- ByteBuffer buffer = ByteBuffer.allocate(batchSize);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
- compressionType,
- TimestampType.CREATE_TIME,
- batchSize);
- return new ProducerBatch(tp, builder, createdMs, true);
+ private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
+ int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(),
+ record.key(), record.value(), record.headers()), batchSize);
+ ByteBuffer buffer = ByteBuffer.allocate(initialSize);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(),
+ TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional());
+ return new ProducerBatch(topicPartition, builder, this.createdMs, true);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index e028988..be69686 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
@@ -479,6 +480,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
@Override
+ public void writeTo(ByteBufferOutputStream outputStream) {
+ outputStream.write(buffer.duplicate());
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o)
return true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 4e52d61..f01116e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -19,10 +19,10 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Crc32C;
-import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
@@ -207,6 +207,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
@Override
+ public void writeTo(ByteBufferOutputStream outputStream) {
+ outputStream.write(this.buffer.duplicate());
+ }
+
+ @Override
public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}
@@ -447,13 +452,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
/**
* Get an upper bound on the size of a batch with only a single record using a given key and value.
*/
- static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
- return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
- }
-
- /**
- * Get an upper bound on the size of a batch with only a single record using a given key and value.
- */
static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 1af5527..5fe1cef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -35,22 +35,18 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
private int position;
private final int end;
private final FileChannel channel;
- private final int maxRecordSize;
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
/**
* Create a new log input stream over the FileChannel
* @param channel Underlying FileChannel
- * @param maxRecordSize Maximum size of records
* @param start Position in the file channel to start from
* @param end Position in the file channel not to read past
*/
FileLogInputStream(FileChannel channel,
- int maxRecordSize,
int start,
int end) {
this.channel = channel;
- this.maxRecordSize = maxRecordSize;
this.position = start;
this.end = end;
}
@@ -71,9 +67,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
- if (size > maxRecordSize)
- throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
-
if (position + LOG_OVERHEAD + size > end)
return null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 16d3777..a72ba8b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -339,35 +339,22 @@ public class FileRecords extends AbstractRecords implements Closeable {
return batches;
}
- /**
- * Get an iterator over the record batches, enforcing a maximum record size
- * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
- * @return An iterator over the batches
- */
- public Iterable<FileChannelRecordBatch> batches(int maxRecordSize) {
- return batches(maxRecordSize, start);
- }
-
- private Iterable<FileChannelRecordBatch> batchesFrom(int start) {
- return batches(Integer.MAX_VALUE, start);
- }
-
- private Iterable<FileChannelRecordBatch> batches(final int maxRecordSize, final int start) {
+ private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
return new Iterable<FileChannelRecordBatch>() {
@Override
public Iterator<FileChannelRecordBatch> iterator() {
- return batchIterator(maxRecordSize, start);
+ return batchIterator(start);
}
};
}
- private Iterator<FileChannelRecordBatch> batchIterator(int maxRecordSize, int start) {
+ private Iterator<FileChannelRecordBatch> batchIterator(int start) {
final int end;
if (isSlice)
end = this.end;
else
end = this.sizeInBytes();
- FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
+ FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
return new RecordBatchIterator<>(inputStream);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index d3bdee2..56d7ed1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,6 +16,11 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@@ -31,8 +36,8 @@ import java.util.Objects;
* or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants.
*/
public class MemoryRecords extends AbstractRecords {
-
- public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
+ private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
+ public static final MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
private final ByteBuffer buffer;
@@ -110,16 +115,21 @@ public class MemoryRecords extends AbstractRecords {
/**
* Filter the records into the provided ByteBuffer.
+ * @param partition The partition that is filtered (used only for logging)
* @param filter The filter function
* @param destinationBuffer The byte buffer to write the filtered records to
- * @return A FilterResult with a summary of the output (for metrics)
+ * @param maxRecordBatchSize The maximum record batch size. Note this is not a hard limit: if a batch
+ * exceeds this after filtering, we log a warning, but the batch will still be
+ * created.
+ * @return A FilterResult with a summary of the output (for metrics) and potentially an overflow buffer
*/
- public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer) {
- return filterTo(batches(), filter, destinationBuffer);
+ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
+ int maxRecordBatchSize) {
+ return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize);
}
- private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
- ByteBuffer destinationBuffer) {
+ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
+ RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize) {
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
@@ -128,6 +138,8 @@ public class MemoryRecords extends AbstractRecords {
int messagesRetained = 0;
int bytesRetained = 0;
+ ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
+
for (MutableRecordBatch batch : batches) {
bytesRead += batch.sizeInBytes();
@@ -140,7 +152,7 @@ public class MemoryRecords extends AbstractRecords {
// recopy the messages to the destination buffer.
byte batchMagic = batch.magic();
- boolean writeOriginalEntry = true;
+ boolean writeOriginalBatch = true;
List<Record> retainedRecords = new ArrayList<>();
for (Record record : batch) {
@@ -150,20 +162,19 @@ public class MemoryRecords extends AbstractRecords {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
// the corrupted batch with correct data.
if (!record.hasMagic(batchMagic))
- writeOriginalEntry = false;
+ writeOriginalBatch = false;
if (record.offset() > maxOffset)
maxOffset = record.offset();
retainedRecords.add(record);
} else {
- writeOriginalEntry = false;
+ writeOriginalBatch = false;
}
}
- if (writeOriginalEntry) {
- // There are no messages compacted out and no message format conversion, write the original message set back
- batch.writeTo(destinationBuffer);
+ if (writeOriginalBatch) {
+ batch.writeTo(bufferOutputStream);
messagesRetained += retainedRecords.size();
bytesRetained += batch.sizeInBytes();
if (batch.maxTimestamp() > maxTimestamp) {
@@ -171,29 +182,18 @@ public class MemoryRecords extends AbstractRecords {
shallowOffsetOfMaxTimestamp = batch.lastOffset();
}
} else if (!retainedRecords.isEmpty()) {
- ByteBuffer slice = destinationBuffer.slice();
- TimestampType timestampType = batch.timestampType();
- long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
- long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ?
- batch.baseOffset() : retainedRecords.get(0).offset();
-
- MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
- baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
- batch.isTransactional(), batch.partitionLeaderEpoch());
-
- for (Record record : retainedRecords)
- builder.append(record);
-
- if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
- // we must preserve the last offset from the initial batch in order to ensure that the
- // last sequence number from the batch remains even after compaction. Otherwise, the producer
- // could incorrectly see an out of sequence error.
- builder.overrideLastOffset(batch.lastOffset());
-
+ MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
MemoryRecords records = builder.build();
- destinationBuffer.position(destinationBuffer.position() + slice.position());
+ int filteredBatchSize = records.sizeInBytes();
+
messagesRetained += retainedRecords.size();
- bytesRetained += records.sizeInBytes();
+ bytesRetained += filteredBatchSize;
+
+ if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
+ log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
+ "(new size is {}). Consumers from version 0.10.1 and earlier may need to " +
+ "increase their fetch sizes.",
+ partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
if (info.maxTimestamp > maxTimestamp) {
@@ -201,9 +201,44 @@ public class MemoryRecords extends AbstractRecords {
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
}
}
+
+ // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
+ // avoid the need for additional allocations.
+ ByteBuffer outputBuffer = bufferOutputStream.buffer();
+ if (outputBuffer != destinationBuffer)
+ return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
+ maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
}
- return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+ return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
+ maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+ }
+
+ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
+ List<Record> retainedRecords,
+ ByteBufferOutputStream bufferOutputStream) {
+ byte magic = originalBatch.magic();
+ TimestampType timestampType = originalBatch.timestampType();
+ long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
+ originalBatch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+ long baseOffset = magic >= RecordBatch.MAGIC_VALUE_V2 ?
+ originalBatch.baseOffset() : retainedRecords.get(0).offset();
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
+ originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
+ originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
+ originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
+
+ for (Record record : retainedRecords)
+ builder.append(record);
+
+ if (magic >= RecordBatch.MAGIC_VALUE_V2)
+ // we must preserve the last offset from the initial batch in order to ensure that the
+ // last sequence number from the batch remains even after compaction. Otherwise, the producer
+ // could incorrectly see an out of sequence error.
+ builder.overrideLastOffset(originalBatch.lastOffset());
+
+ return builder;
}
/**
@@ -271,6 +306,7 @@ public class MemoryRecords extends AbstractRecords {
}
public static class FilterResult {
+ public final ByteBuffer output;
public final int messagesRead;
public final int bytesRead;
public final int messagesRetained;
@@ -279,13 +315,15 @@ public class MemoryRecords extends AbstractRecords {
public final long maxTimestamp;
public final long shallowOffsetOfMaxTimestamp;
- public FilterResult(int messagesRead,
+ public FilterResult(ByteBuffer output,
+ int messagesRead,
int bytesRead,
int messagesRetained,
int bytesRetained,
long maxOffset,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp) {
+ this.output = output;
this.messagesRead = messagesRead;
this.bytesRead = bytesRead;
this.messagesRetained = messagesRetained;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index e055aa5..49d70c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -49,7 +49,7 @@ public class MemoryRecordsBuilder {
// so it's not safe to hold a direct reference to the underlying ByteBuffer.
private final ByteBufferOutputStream bufferStream;
private final byte magic;
- private final int initPos;
+ private final int initialPosition;
private final long baseOffset;
private final long logAppendTime;
private final boolean isTransactional;
@@ -75,25 +75,7 @@ public class MemoryRecordsBuilder {
private MemoryRecords builtRecords;
private boolean aborted = false;
- /**
- * Construct a new builder.
- *
- * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
- * to fit the records appended)
- * @param magic The magic value to use
- * @param compressionType The compression codec to use
- * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
- * @param baseOffset The initial offset to use for
- * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
- * @param producerId The producer ID associated with the producer writing this record set
- * @param producerEpoch The epoch of the producer
- * @param baseSequence The sequence number of the first record in this set
- * @param isTransactional Whether or not the records are part of a transaction
- * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
- * when compression is used since size estimates are rough, and in the case that the first
- * record added exceeds the size).
- */
- public MemoryRecordsBuilder(ByteBuffer buffer,
+ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
@@ -120,7 +102,6 @@ public class MemoryRecordsBuilder {
this.compressionType = compressionType;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
- this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
this.actualCompressionRatio = 1;
@@ -132,22 +113,62 @@ public class MemoryRecordsBuilder {
this.isControlBatch = isControlBatch;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
- this.initialCapacity = buffer.capacity();
+
+ this.initialPosition = bufferStream.position();
+ this.initialCapacity = bufferStream.capacity();
if (magic > RecordBatch.MAGIC_VALUE_V1) {
- buffer.position(initPos + DefaultRecordBatch.RECORDS_OFFSET);
+ bufferStream.position(initialPosition + DefaultRecordBatch.RECORDS_OFFSET);
} else if (compressionType != CompressionType.NONE) {
// for compressed records, leave space for the header and the shallow message metadata
// and move the starting position to the value payload offset
- buffer.position(initPos + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
+ bufferStream.position(initialPosition + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
}
// create the stream
- bufferStream = new ByteBufferOutputStream(buffer);
- appendStream = new DataOutputStream(compressionType.wrapForOutput(bufferStream, magic,
+ this.bufferStream = bufferStream;
+ this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic,
COMPRESSION_DEFAULT_BUFFER_SIZE));
}
+ /**
+ * Construct a new builder.
+ *
+ * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary
+ * to fit the records appended)
+ * @param magic The magic value to use
+ * @param compressionType The compression codec to use
+ * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
+ * @param baseOffset The initial offset to use for
+ * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
+ * @param producerId The producer ID associated with the producer writing this record set
+ * @param producerEpoch The epoch of the producer
+ * @param baseSequence The sequence number of the first record in this set
+ * @param isTransactional Whether or not the records are part of a transaction
+ * @param isControlBatch Whether or not this is a control batch (e.g. for transaction markers)
+ * @param partitionLeaderEpoch The epoch of the partition leader appending the record set to the log
+ * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
+ * when compression is used since size estimates are rough, and in the case that the first
+ * record added exceeds the size).
+ */
+ public MemoryRecordsBuilder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ boolean isTransactional,
+ boolean isControlBatch,
+ int partitionLeaderEpoch,
+ int writeLimit) {
+ this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset, logAppendTime,
+ producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch,
+ writeLimit);
+ }
+
public ByteBuffer buffer() {
return bufferStream.buffer();
}
@@ -168,6 +189,10 @@ public class MemoryRecordsBuilder {
return isControlBatch;
}
+ public boolean isTransactional() {
+ return isTransactional;
+ }
+
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@@ -249,7 +274,7 @@ public class MemoryRecordsBuilder {
public void abort() {
closeForRecordAppends();
- buffer().position(initPos);
+ buffer().position(initialPosition);
aborted = true;
}
@@ -260,24 +285,12 @@ public class MemoryRecordsBuilder {
if (builtRecords != null)
return;
- if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID)
- throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
-
- if (producerId != RecordBatch.NO_PRODUCER_ID) {
- if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
- throw new IllegalArgumentException("Invalid negative producer epoch");
-
- if (baseSequence < 0 && !isControlBatch)
- throw new IllegalArgumentException("Invalid negative sequence number used");
-
- if (magic < RecordBatch.MAGIC_VALUE_V2)
- throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
- }
+ validateProducerState();
closeForRecordAppends();
if (numRecords == 0L) {
- buffer().position(initPos);
+ buffer().position(initialPosition);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
@@ -287,11 +300,27 @@ public class MemoryRecordsBuilder {
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
- buffer.position(initPos);
+ buffer.position(initialPosition);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
+ private void validateProducerState() {
+ if (isTransactional && producerId == RecordBatch.NO_PRODUCER_ID)
+ throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
+
+ if (producerId != RecordBatch.NO_PRODUCER_ID) {
+ if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH)
+ throw new IllegalArgumentException("Invalid negative producer epoch");
+
+ if (baseSequence < 0 && !isControlBatch)
+ throw new IllegalArgumentException("Invalid negative sequence number used");
+
+ if (magic < RecordBatch.MAGIC_VALUE_V2)
+ throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
+ }
+ }
+
/**
* Write the header to the default batch.
* @return the written compressed bytes.
@@ -300,8 +329,8 @@ public class MemoryRecordsBuilder {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
- buffer.position(initPos);
- int size = pos - initPos;
+ buffer.position(initialPosition);
+ int size = pos - initialPosition;
int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
int offsetDelta = (int) (lastOffset - baseOffset);
@@ -331,9 +360,9 @@ public class MemoryRecordsBuilder {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
- buffer.position(initPos);
+ buffer.position(initialPosition);
- int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
+ int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
@@ -544,7 +573,7 @@ public class MemoryRecordsBuilder {
* @param record the record to add
*/
public void append(Record record) {
- appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers());
+ appendWithOffset(record.offset(), isControlBatch, record.timestamp(), record.key(), record.value(), record.headers());
}
/**
@@ -736,4 +765,5 @@ public class MemoryRecordsBuilder {
public short producerEpoch() {
return this.producerEpoch;
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
index 2f0a96c..728b6eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
/**
* A mutable record batch is one that can be modified in place (without copying). This is used by the broker
* to override certain fields in the batch before appending it to the log.
@@ -42,4 +44,11 @@ public interface MutableRecordBatch extends RecordBatch {
* @param epoch The partition leader epoch to use
*/
void setPartitionLeaderEpoch(int epoch);
+
+ /**
+ * Write this record batch into an output stream.
+ * @param outputStream The buffer to write the batch to
+ */
+ void writeTo(ByteBufferOutputStream outputStream);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
index 9480c6d..79d4d4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -20,16 +20,18 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
- * A byte buffer backed output outputStream
+ * A ByteBuffer-backed OutputStream
*/
public class ByteBufferOutputStream extends OutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
private ByteBuffer buffer;
+ private int initialPosition;
public ByteBufferOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
+ this.initialPosition = buffer.position();
}
public void write(int b) {
@@ -40,18 +42,55 @@ public class ByteBufferOutputStream extends OutputStream {
public void write(byte[] bytes, int off, int len) {
if (buffer.remaining() < len)
- expandBuffer(buffer.capacity() + len);
+ expandBuffer(buffer.position() + len);
buffer.put(bytes, off, len);
}
+ public void write(ByteBuffer buffer) {
+ if (buffer.hasArray())
+ write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ else {
+ int pos = buffer.position();
+ for (int i = pos, limit = buffer.remaining() + pos; i < limit; i++)
+ write(buffer.get(i));
+ }
+ }
+
public ByteBuffer buffer() {
return buffer;
}
+ public int position() {
+ return buffer.position();
+ }
+
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ public int limit() {
+ return buffer.limit();
+ }
+
+ public void position(int position) {
+ if (position > buffer.limit())
+ expandBuffer(position);
+ buffer.position(position);
+ }
+
private void expandBuffer(int size) {
int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
ByteBuffer temp = ByteBuffer.allocate(expandSize);
- temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ if (buffer.hasArray()) {
+ temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ } else {
+ int limit = buffer.position();
+ for (int i = 0; i < limit; i++)
+ temp.put(buffer.get(i));
+ }
+
+ // reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
+ buffer.position(initialPosition);
buffer = temp;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 9ca95e3..ce3c599 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -153,8 +153,7 @@ public class MockClient implements KafkaClient {
short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
- throw new IllegalStateException("Next in line response did not match expected request, request: "
- + abstractRequest);
+ throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody);
responses.add(resp);
@@ -195,6 +194,18 @@ public class MockClient implements KafkaClient {
respond(response, false);
}
+ public void respond(RequestMatcher matcher, AbstractResponse response) {
+ ClientRequest nextRequest = requests.peek();
+ if (nextRequest == null)
+ throw new IllegalStateException("No current requests queued");
+
+ AbstractRequest request = nextRequest.requestBuilder().build();
+ if (!matcher.matches(request))
+ throw new IllegalStateException("Request matcher did not match next-in-line request " + request);
+
+ respond(response);
+ }
+
public void respond(AbstractResponse response, boolean disconnected) {
ClientRequest request = requests.remove();
short version = request.requestBuilder().desiredOrLatestVersion();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index f48ab33..e079f2a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -32,8 +32,11 @@ import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -563,7 +566,10 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time,
new ApiVersions(), null);
// Create a big batch
- ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now);
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
+
byte[] value = new byte[1024];
final AtomicInteger acked = new AtomicInteger(0);
Callback cb = new Callback() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c08ea57..77b1da8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -38,14 +38,14 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
-import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
@@ -61,6 +61,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -554,28 +555,18 @@ public class SenderTest {
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
Metrics m = new Metrics();
TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0);
- txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0));
+ ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+ txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
new ApiVersions(), txnManager);
try {
- Sender sender = new Sender(client,
- metadata,
- this.accumulator,
- true,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- maxRetries,
- m,
- time,
- REQUEST_TIMEOUT,
- 1000L,
- txnManager,
- new ApiVersions());
+ Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
// Send the first message.
- TopicPartition tp2 = new TopicPartition(topic, 1);
+ final TopicPartition tp2 = new TopicPartition(topic, 1);
Future<RecordMetadata> f1 =
accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> f2 =
@@ -607,7 +598,8 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
- client.respond(new ProduceResponse(responseMap));
+ client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap));
+
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f1.isDone());
assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue());
@@ -621,7 +613,8 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
- client.respond(new ProduceResponse(responseMap));
+ client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap));
+
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f2.isDone());
assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue());
@@ -635,6 +628,36 @@ public class SenderTest {
}
}
+ private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp,
+ final ProducerIdAndEpoch producerIdAndEpoch,
+ final int sequence,
+ final boolean isTransactional) {
+ return new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ if (!(body instanceof ProduceRequest))
+ return false;
+
+ ProduceRequest request = (ProduceRequest) body;
+ Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
+ MemoryRecords records = recordsMap.get(tp);
+ if (records == null)
+ return false;
+
+ List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+ if (batches.isEmpty() || batches.size() > 1)
+ return false;
+
+ MutableRecordBatch batch = batches.get(0);
+ return batch.baseOffset() == 0L &&
+ batch.baseSequence() == sequence &&
+ batch.producerId() == producerIdAndEpoch.producerId &&
+ batch.producerEpoch() == producerIdAndEpoch.epoch &&
+ batch.isTransactional() == isTransactional;
+ }
+ };
+ }
+
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
assertTrue("Request should be completed", future.isDone());
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 65de01c..7c37354 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -37,7 +37,7 @@ public class FileLogInputStreamTest {
new SimpleRecord("bar".getBytes())));
fileRecords.flush();
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0,
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index afd0126..a7058a3 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -30,6 +31,7 @@ import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -202,7 +204,8 @@ public class MemoryRecordsTest {
builder.append(12L, null, "c".getBytes());
ByteBuffer filtered = ByteBuffer.allocate(2048);
- builder.build().filterTo(new RetainNonNullKeysFilter(), filtered);
+ builder.build().filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered,
+ Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -278,7 +281,7 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new MemoryRecords.RecordFilter() {
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
@Override
protected boolean shouldDiscard(RecordBatch batch) {
// discard the second and fourth batches
@@ -289,7 +292,7 @@ public class MemoryRecordsTest {
protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
return true;
}
- }, filtered);
+ }, filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -316,7 +319,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+ filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -383,7 +387,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+ filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -440,6 +445,55 @@ public class MemoryRecordsTest {
}
@Test
+ public void testFilterToWithUndersizedBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "1".getBytes(), new byte[128]);
+ builder.append(12L, "2".getBytes(), "c".getBytes());
+ builder.append(13L, null, "d".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 4L);
+ builder.append(14L, null, "e".getBytes());
+ builder.append(15L, "5".getBytes(), "f".getBytes());
+ builder.append(16L, "6".getBytes(), "g".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 7L);
+ builder.append(17L, "7".getBytes(), new byte[128]);
+ builder.close();
+
+ buffer.flip();
+
+ ByteBuffer output = ByteBuffer.allocate(64);
+
+ List<Record> records = new ArrayList<>();
+ while (buffer.hasRemaining()) {
+ output.rewind();
+
+ MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
+ .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE);
+
+ buffer.position(buffer.position() + result.bytesRead);
+ result.output.flip();
+
+ if (output != result.output)
+ assertEquals(0, output.position());
+
+ MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
+ records.addAll(TestUtils.toList(filtered.records()));
+ }
+
+ assertEquals(5, records.size());
+ for (Record record : records)
+ assertNotNull(record.key());
+ }
+
+ @Test
public void testFilterTo() {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
@@ -464,7 +518,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+ MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(
+ new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
filtered.flip();
@@ -576,7 +631,8 @@ public class MemoryRecordsTest {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
+ filtered, Integer.MAX_VALUE);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
new file mode 100644
index 0000000..2ef5672
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ByteBufferOutputStreamTest {
+
+ @Test
+ public void testExpandByteBufferOnPositionIncrease() throws Exception {
+ testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16));
+ }
+
+ @Test
+ public void testExpandDirectByteBufferOnPositionIncrease() throws Exception {
+ testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16));
+ }
+
+ private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws Exception {
+ ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+ output.write("hello".getBytes());
+ output.position(32);
+ assertEquals(32, output.position());
+ assertEquals(0, initialBuffer.position());
+
+ ByteBuffer buffer = output.buffer();
+ assertEquals(32, buffer.limit());
+ buffer.position(0);
+ buffer.limit(5);
+ byte[] bytes = new byte[5];
+ buffer.get(bytes);
+ assertArrayEquals("hello".getBytes(), bytes);
+ }
+
+ @Test
+ public void testExpandByteBufferOnWrite() throws Exception {
+ testExpandByteBufferOnWrite(ByteBuffer.allocate(16));
+ }
+
+ @Test
+ public void testExpandDirectByteBufferOnWrite() throws Exception {
+ testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16));
+ }
+
+ private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception {
+ ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+ output.write("hello".getBytes());
+ output.write(new byte[27]);
+ assertEquals(32, output.position());
+ assertEquals(0, initialBuffer.position());
+
+ ByteBuffer buffer = output.buffer();
+ assertEquals(32, buffer.limit());
+ buffer.position(0);
+ buffer.limit(5);
+ byte[] bytes = new byte[5];
+ buffer.get(bytes);
+ assertArrayEquals("hello".getBytes(), bytes);
+ }
+
+ @Test
+ public void testWriteByteBuffer() {
+ testWriteByteBuffer(ByteBuffer.allocate(16));
+ }
+
+ @Test
+ public void testWriteDirectByteBuffer() {
+ testWriteByteBuffer(ByteBuffer.allocateDirect(16));
+ }
+
+ private void testWriteByteBuffer(ByteBuffer input) {
+ long value = 234239230L;
+ input.putLong(value);
+ input.flip();
+
+ ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
+ output.write(input);
+ assertEquals(8, output.position());
+ assertEquals(value, output.buffer().getLong(0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 67b9271..c37ea08 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -318,7 +318,7 @@ class Log(@volatile var dir: File,
loadProducersFromLog(stateManager, fetchDataInfo.records)
}
stateManager.updateMapEndOffset(segment.baseOffset)
- val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
+ val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8eda2e1..b05e37f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -516,22 +516,23 @@ private[log] class Cleaner(val id: Int,
source.log.readInto(readBuffer, position)
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
- val result = records.filterTo(logCleanerFilter, writeBuffer)
+ val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)
position += result.bytesRead
// if any messages are to be retained, write them out
- if (writeBuffer.position > 0) {
- writeBuffer.flip()
- val retained = MemoryRecords.readableRecords(writeBuffer)
+ val outputBuffer = result.output
+ if (outputBuffer.position > 0) {
+ outputBuffer.flip()
+ val retained = MemoryRecords.readableRecords(outputBuffer)
dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
largestOffset = result.maxOffset,
largestTimestamp = result.maxTimestamp,
shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
records = retained)
- throttler.maybeThrottle(writeBuffer.limit)
+ throttler.maybeThrottle(outputBuffer.limit)
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 70269bb..3e4c47d 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -242,19 +242,16 @@ class LogSegment(val log: FileRecords,
index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
/**
- * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
+ * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
+ * from the end of the log and index.
*
- * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
- * is corrupt.
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
* the transaction index.
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
- def recover(maxMessageSize: Int,
- producerStateManager: ProducerStateManager,
- leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+ def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
@@ -264,7 +261,7 @@ class LogSegment(val log: FileRecords,
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
- for (batch <- log.batches(maxMessageSize).asScala) {
+ for (batch <- log.batches.asScala) {
batch.ensureValid()
// The max timestamp is exposed at the batch level, so no need to iterate the records
@@ -296,6 +293,9 @@ class LogSegment(val log: FileRecords,
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
+ if (truncated > 0)
+ logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
+
log.truncateTo(validBytes)
index.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f0c41c7..4de546f 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -337,8 +337,8 @@ object DumpLogSegments {
val messageSet = FileRecords.open(file, false)
var validBytes = 0L
var lastOffset = -1L
- val batches = messageSet.batches(maxMessageSize).asScala
- for (batch <- batches) {
+
+ for (batch <- messageSet.batches.asScala) {
if (isDeepIteration) {
for (record <- batch.asScala) {
if (lastOffset == -1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 5db1ed6..79fe220 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -264,7 +264,7 @@ class LogSegmentTest {
seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
val indexFile = seg.index.file
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
- seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
+ seg.recover(new ProducerStateManager(topicPartition, logDir))
for(i <- 0 until 100)
assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
}
@@ -303,7 +303,7 @@ class LogSegmentTest {
shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
var stateManager = new ProducerStateManager(topicPartition, logDir)
- segment.recover(64 * 1024, stateManager)
+ segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)
@@ -318,7 +318,7 @@ class LogSegmentTest {
// recover again, but this time assuming the transaction from pid2 began on a previous segment
stateManager = new ProducerStateManager(topicPartition, logDir)
stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
- segment.recover(64 * 1024, stateManager)
+ segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)
abortedTxns = segment.txnIndex.allAbortedTxns
@@ -352,7 +352,7 @@ class LogSegmentTest {
seg.append(i, i, i * 10, i, records(i, i.toString))
val timeIndexFile = seg.timeIndex.file
TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
- seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
+ seg.recover(new ProducerStateManager(topicPartition, logDir))
for(i <- 0 until 100) {
assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
if (i < 99)
@@ -376,7 +376,7 @@ class LogSegmentTest {
val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
val position = recordPosition.position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
- seg.recover(64*1024, new ProducerStateManager(topicPartition, logDir))
+ seg.recover(new ProducerStateManager(topicPartition, logDir))
assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
seg.log.batches.asScala.map(_.lastOffset).toList)
seg.delete()
http://git-wip-us.apache.org/repos/asf/kafka/blob/dfa3c8a9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 84ff43b..52e9140 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,6 +30,7 @@ import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@@ -221,9 +222,9 @@ class LogTest {
records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new RecordFilter {
+ records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
- }, filtered)
+ }, filtered, Int.MaxValue)
filtered.flip()
val filteredRecords = MemoryRecords.readableRecords(filtered)
@@ -265,9 +266,9 @@ class LogTest {
records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new RecordFilter {
+ records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
- }, filtered)
+ }, filtered, Int.MaxValue)
filtered.flip()
val filteredRecords = MemoryRecords.readableRecords(filtered)