You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/31 06:04:09 UTC
[kafka] branch trunk updated: KAFKA-6927;
Chunked down-conversion to prevent out of memory errors on broker
[KIP-283] (#4871)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 837f31d KAFKA-6927; Chunked down-conversion to prevent out of memory errors on broker [KIP-283] (#4871)
837f31d is described below
commit 837f31dd1850b179918f83338b4b4487486b2c58
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed May 30 23:03:51 2018 -0700
KAFKA-6927; Chunked down-conversion to prevent out of memory errors on broker [KIP-283] (#4871)
Implementation for lazy down-conversion in a chunked manner for efficient memory usage during down-conversion. This pull request is mainly to get initial feedback on the direction of the patch. The patch includes all the main components from KIP-283.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/consumer/internals/Fetcher.java | 11 +-
.../apache/kafka/common/protocol/types/Struct.java | 6 +-
.../apache/kafka/common/protocol/types/Type.java | 16 +-
.../kafka/common/record/AbstractRecords.java | 110 +----------
.../{ConvertedRecords.java => BaseRecords.java} | 30 ++-
.../kafka/common/record/ConvertedRecords.java | 10 +-
...nvertedRecords.java => DefaultRecordsSend.java} | 21 +--
.../apache/kafka/common/record/FileRecords.java | 15 +-
.../common/record/LazyDownConversionRecords.java | 168 +++++++++++++++++
.../record/LazyDownConversionRecordsSend.java | 99 ++++++++++
.../apache/kafka/common/record/MemoryRecords.java | 10 +-
.../MultiRecordsSend.java} | 43 ++++-
...essingStats.java => RecordConversionStats.java} | 24 ++-
.../org/apache/kafka/common/record/Records.java | 35 ++--
.../common/{requests => record}/RecordsSend.java | 36 +++-
.../{AbstractRecords.java => RecordsUtil.java} | 138 +-------------
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 78 ++++----
.../kafka/clients/consumer/KafkaConsumerTest.java | 13 +-
.../clients/consumer/internals/FetcherTest.java | 64 +++----
.../kafka/common/record/FileRecordsTest.java | 133 ++++++++++----
.../record/LazyDownConversionRecordsTest.java | 203 +++++++++++++++++++++
.../common/record/MemoryRecordsBuilderTest.java | 12 +-
.../MultiRecordsSendTest.java} | 10 +-
core/src/main/scala/kafka/log/Log.scala | 12 +-
core/src/main/scala/kafka/log/LogValidator.scala | 18 +-
.../main/scala/kafka/network/RequestChannel.scala | 81 +++++---
.../main/scala/kafka/network/SocketServer.scala | 42 +++--
.../scala/kafka/server/ClientQuotaManager.scala | 13 +-
.../src/main/scala/kafka/server/FetchSession.scala | 26 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 169 +++++++++--------
.../kafka/server/ReplicaAlterLogDirsThread.scala | 19 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 6 +-
.../main/scala/kafka/server/ReplicaManager.scala | 11 +-
.../main/scala/kafka/server/ThrottledChannel.scala | 15 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 21 +--
.../AbstractCoordinatorConcurrencyTest.scala | 10 +-
.../scala/unit/kafka/log/LogValidatorTest.scala | 30 +--
.../unit/kafka/network/SocketServerTest.scala | 42 +++--
.../unit/kafka/server/ClientQuotaManagerTest.scala | 45 ++++-
.../scala/unit/kafka/server/FetchRequestTest.scala | 62 ++++---
.../scala/unit/kafka/server/FetchSessionTest.scala | 15 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 8 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 4 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../server/ThrottledChannelExpirationTest.scala | 50 +++--
.../util/ReplicaFetcherMockBlockingSend.scala | 6 +-
47 files changed, 1249 insertions(+), 745 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 38f324f..ca8e0d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@@ -204,7 +205,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
- FetchResponse response = (FetchResponse) resp.responseBody();
+ FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
@@ -218,7 +219,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
- for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
+ for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
@@ -894,7 +895,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
*/
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
- FetchResponse.PartitionData partition = completedFetch.partitionData;
+ FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
PartitionRecords partitionRecords = null;
Errors error = partition.error;
@@ -1252,13 +1253,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private static class CompletedFetch {
private final TopicPartition partition;
private final long fetchedOffset;
- private final FetchResponse.PartitionData partitionData;
+ private final FetchResponse.PartitionData<Records> partitionData;
private final FetchResponseMetricAggregator metricAggregator;
private final short responseVersion;
private CompletedFetch(TopicPartition partition,
long fetchedOffset,
- FetchResponse.PartitionData partitionData,
+ FetchResponse.PartitionData<Records> partitionData,
FetchResponseMetricAggregator metricAggregator,
short responseVersion) {
this.partition = partition;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ac24a1b..7dccc10 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.protocol.types;
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.BaseRecords;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -172,8 +172,8 @@ public class Struct {
return (Byte) get(name);
}
- public Records getRecords(String name) {
- return (Records) get(name);
+ public BaseRecords getRecords(String name) {
+ return (BaseRecords) get(name);
}
public Short getShort(BoundField field) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 85916d5..4bd508b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.protocol.types;
-import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.ByteUtils;
@@ -549,14 +549,14 @@ public abstract class Type {
@Override
public void write(ByteBuffer buffer, Object o) {
- if (o instanceof FileRecords)
- throw new IllegalArgumentException("FileRecords must be written to the channel directly");
+ if (!(o instanceof MemoryRecords))
+ throw new IllegalArgumentException("Unexpected record type: " + o.getClass());
MemoryRecords records = (MemoryRecords) o;
NULLABLE_BYTES.write(buffer, records.buffer().duplicate());
}
@Override
- public Records read(ByteBuffer buffer) {
+ public MemoryRecords read(ByteBuffer buffer) {
ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
return MemoryRecords.readableRecords(recordsBuffer);
}
@@ -566,7 +566,7 @@ public abstract class Type {
if (o == null)
return 4;
- Records records = (Records) o;
+ BaseRecords records = (BaseRecords) o;
return 4 + records.sizeInBytes();
}
@@ -576,12 +576,12 @@ public abstract class Type {
}
@Override
- public Records validate(Object item) {
+ public BaseRecords validate(Object item) {
if (item == null)
return null;
- if (item instanceof Records)
- return (Records) item;
+ if (item instanceof BaseRecords)
+ return (BaseRecords) item;
throw new SchemaException(item + " is not an instance of " + Records.class.getName());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 89a5413..5e41901 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -18,13 +18,10 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
public abstract class AbstractRecords implements Records {
@@ -52,97 +49,6 @@ public abstract class AbstractRecords implements Records {
}
/**
- * Down convert batches to the provided message format version. The first offset parameter is only relevant in the
- * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
- * are not batched (put another way, each batch always has 1 record).
- *
- * If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we
- * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
- * correctness.
- *
- * The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap
- * (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we
- * only load records into the heap when down converting), but it's not for the producer. However, down converting
- * in the producer is very uncommon and the extra complexity to handle that case is not worth it.
- */
- protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
- long firstOffset, Time time) {
- // maintain the batch along with the decompressed records to avoid the need to decompress again
- List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
- int totalSizeEstimate = 0;
- long startNanos = time.nanoseconds();
-
- for (RecordBatch batch : batches) {
- if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
- continue;
-
- if (batch.magic() <= toMagic) {
- totalSizeEstimate += batch.sizeInBytes();
- recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
- } else {
- List<Record> records = new ArrayList<>();
- for (Record record : batch) {
- // See the method javadoc for an explanation
- if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset)
- records.add(record);
- }
- if (records.isEmpty())
- continue;
- final long baseOffset;
- if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2)
- baseOffset = batch.baseOffset();
- else
- baseOffset = records.get(0).offset();
- totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
- recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
- }
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
- long temporaryMemoryBytes = 0;
- int numRecordsConverted = 0;
- for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
- temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
- if (recordBatchAndRecords.batch.magic() <= toMagic) {
- recordBatchAndRecords.batch.writeTo(buffer);
- } else {
- MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
- buffer = builder.buffer();
- temporaryMemoryBytes += builder.uncompressedBytesWritten();
- numRecordsConverted += builder.numRecords();
- }
- }
-
- buffer.flip();
- RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted,
- time.nanoseconds() - startNanos);
- return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
- }
-
- /**
- * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
- * one (e.g. it may require expansion).
- */
- private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
- RecordBatch batch = recordBatchAndRecords.batch;
- final TimestampType timestampType = batch.timestampType();
- long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
-
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
- timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
- for (Record record : recordBatchAndRecords.records) {
- // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
- if (magic > RecordBatch.MAGIC_VALUE_V1)
- builder.append(record);
- else
- builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
- }
-
- builder.close();
- return builder;
- }
-
- /**
* Get an iterator over the deep records.
* @return An iterator over the records
*/
@@ -151,6 +57,11 @@ public abstract class AbstractRecords implements Records {
return records;
}
+ @Override
+ public RecordsSend toSend(String destination) {
+ return new DefaultRecordsSend(destination, this);
+ }
+
private Iterator<Record> recordsIterator() {
return new AbstractIterator<Record>() {
private final Iterator<? extends RecordBatch> batches = batches().iterator();
@@ -241,16 +152,5 @@ public abstract class AbstractRecords implements Records {
}
}
- private static class RecordBatchAndRecords {
- private final RecordBatch batch;
- private final List<Record> records;
- private final Long baseOffset;
-
- private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
- this.batch = batch;
- this.records = records;
- this.baseOffset = baseOffset;
- }
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
similarity index 62%
copy from clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
index fe37c48..3ebaf79 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/BaseRecords.java
@@ -16,21 +16,19 @@
*/
package org.apache.kafka.common.record;
-public class ConvertedRecords<T extends Records> {
-
- private final T records;
- private final RecordsProcessingStats recordsProcessingStats;
-
- public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
- this.records = records;
- this.recordsProcessingStats = recordsProcessingStats;
- }
-
- public T records() {
- return records;
- }
+/**
+ * Base interface for accessing records which could be contained in the log, or an in-memory materialization of log records.
+ */
+public interface BaseRecords {
+ /**
+ * The size of these records in bytes.
+ * @return The size in bytes of the records
+ */
+ int sizeInBytes();
- public RecordsProcessingStats recordsProcessingStats() {
- return recordsProcessingStats;
- }
+ /**
+ * Encapsulate this {@link BaseRecords} object into {@link RecordsSend}
+ * @return Initialized {@link RecordsSend} object
+ */
+ RecordsSend toSend(String destination);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
index fe37c48..d9150e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
@@ -19,18 +19,18 @@ package org.apache.kafka.common.record;
public class ConvertedRecords<T extends Records> {
private final T records;
- private final RecordsProcessingStats recordsProcessingStats;
+ private final RecordConversionStats recordConversionStats;
- public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
+ public ConvertedRecords(T records, RecordConversionStats recordConversionStats) {
this.records = records;
- this.recordsProcessingStats = recordsProcessingStats;
+ this.recordConversionStats = recordConversionStats;
}
public T records() {
return records;
}
- public RecordsProcessingStats recordsProcessingStats() {
- return recordsProcessingStats;
+ public RecordConversionStats recordConversionStats() {
+ return recordConversionStats;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
similarity index 58%
copy from clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
index fe37c48..aa715ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
@@ -16,21 +16,20 @@
*/
package org.apache.kafka.common.record;
-public class ConvertedRecords<T extends Records> {
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
- private final T records;
- private final RecordsProcessingStats recordsProcessingStats;
-
- public ConvertedRecords(T records, RecordsProcessingStats recordsProcessingStats) {
- this.records = records;
- this.recordsProcessingStats = recordsProcessingStats;
+public class DefaultRecordsSend extends RecordsSend<Records> {
+ public DefaultRecordsSend(String destination, Records records) {
+ this(destination, records, records.sizeInBytes());
}
- public T records() {
- return records;
+ public DefaultRecordsSend(String destination, Records records, int maxBytesToWrite) {
+ super(destination, records, maxBytesToWrite);
}
- public RecordsProcessingStats recordsProcessingStats() {
- return recordsProcessingStats;
+ @Override
+ protected long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
+ return records().writeTo(channel, previouslyWritten, remaining);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 6b6e0ab..e44d5d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -239,8 +240,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
@Override
public ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time) {
- ConvertedRecords<MemoryRecords> convertedRecords = downConvert(batches, toMagic, firstOffset, time);
- if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
+ ConvertedRecords<MemoryRecords> convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+ if (convertedRecords.recordConversionStats().numRecordsConverted() == 0) {
// This indicates that the message is too large, which means that the buffer is not large
// enough to hold a full record batch. We just return all the bytes in this instance.
// Even though the record batch does not have the right format version, we expect old clients
@@ -248,7 +249,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
// are not enough available bytes in the response to read it fully. Note that this is
// only possible prior to KIP-74, after which the broker was changed to always return at least
// one full record batch, even if it requires exceeding the max fetch size requested by the client.
- return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
+ return new ConvertedRecords<>(this, RecordConversionStats.EMPTY);
} else {
return convertedRecords;
}
@@ -364,7 +365,12 @@ public class FileRecords extends AbstractRecords implements Closeable {
};
}
- private Iterator<FileChannelRecordBatch> batchIterator(int start) {
+ @Override
+ public AbstractIterator<FileChannelRecordBatch> batchIterator() {
+ return batchIterator(start);
+ }
+
+ private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) {
final int end;
if (isSlice)
end = this.end;
@@ -510,5 +516,4 @@ public class FileRecords extends AbstractRecords implements Closeable {
')';
}
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
new file mode 100644
index 0000000..da14b5b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See
+ * {@link LazyDownConversionRecordsSend} for the actual chunked send implementation.
+ */
+public class LazyDownConversionRecords implements BaseRecords {
+ private final TopicPartition topicPartition;
+ private final Records records;
+ private final byte toMagic;
+ private final long firstOffset;
+ private ConvertedRecords firstConvertedBatch;
+ private final int sizeInBytes;
+ private final Time time;
+
+ /**
+ * @param topicPartition The topic-partition to which records belong
+ * @param records Records to lazily down-convert
+ * @param toMagic Magic version to down-convert to
+ * @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
+ * {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
+ * @param time The time instance to use
+ */
+ public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
+ this.topicPartition = Objects.requireNonNull(topicPartition);
+ this.records = Objects.requireNonNull(records);
+ this.toMagic = toMagic;
+ this.firstOffset = firstOffset;
+ this.time = Objects.requireNonNull(time);
+
+ // Kafka consumers expect at least one full batch of messages for every topic-partition. To guarantee this, we
+ // need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve
+ // this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least
+ // its size.
+ AbstractIterator<? extends RecordBatch> it = records.batchIterator();
+ if (it.hasNext()) {
+ firstConvertedBatch = RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic, firstOffset, time);
+ sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
+ } else {
+ firstConvertedBatch = null;
+ sizeInBytes = 0;
+ }
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return sizeInBytes;
+ }
+
+ @Override
+ public LazyDownConversionRecordsSend toSend(String destination) {
+ return new LazyDownConversionRecordsSend(destination, this);
+ }
+
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof LazyDownConversionRecords) {
+ LazyDownConversionRecords that = (LazyDownConversionRecords) o;
+ return toMagic == that.toMagic &&
+ firstOffset == that.firstOffset &&
+ topicPartition.equals(that.topicPartition) &&
+ records.equals(that.records);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = toMagic;
+ result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
+ result = 31 * result + topicPartition.hashCode();
+ result = 31 * result + records.hashCode();
+ return result;
+ }
+
+ public java.util.Iterator<ConvertedRecords> iterator(long maximumReadSize) {
+ // We typically expect only one iterator instance to be created, so null out the first converted batch after
+ // first use to make it available for GC.
+ ConvertedRecords firstBatch = firstConvertedBatch;
+ firstConvertedBatch = null;
+ return new Iterator(records, maximumReadSize, firstBatch);
+ }
+
+ /**
+ * Implementation for being able to iterate over down-converted records. Goal of this implementation is to keep
+ * it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains
+ * a view into batches of down-converted records.
+ */
+ private class Iterator extends AbstractIterator<ConvertedRecords> {
+ private final AbstractIterator<? extends RecordBatch> batchIterator;
+ private final long maximumReadSize;
+ private ConvertedRecords firstConvertedBatch;
+
+ /**
+ * @param recordsToDownConvert Records that require down-conversion
+ * @param maximumReadSize Maximum possible size of underlying records that will be down-converted in each call to
+ * {@link #makeNext()}. This is a soft limit as {@link #makeNext()} will always convert
+ * and return at least one full message batch.
+ */
+ private Iterator(Records recordsToDownConvert, long maximumReadSize, ConvertedRecords firstConvertedBatch) {
+ this.batchIterator = recordsToDownConvert.batchIterator();
+ this.maximumReadSize = maximumReadSize;
+ this.firstConvertedBatch = firstConvertedBatch;
+ // If we already have the first down-converted batch, advance the underlying records iterator to next batch
+ if (firstConvertedBatch != null)
+ this.batchIterator.next();
+ }
+
+ /**
+ * Make next set of down-converted records
+ * @return Down-converted records
+ */
+ @Override
+ protected ConvertedRecords makeNext() {
+ // If we have cached the first down-converted batch, return that now
+ if (firstConvertedBatch != null) {
+ ConvertedRecords convertedBatch = firstConvertedBatch;
+ firstConvertedBatch = null;
+ return convertedBatch;
+ }
+
+ if (!batchIterator.hasNext())
+ return allDone();
+
+ // Figure out batches we should down-convert based on the size constraints
+ List<RecordBatch> batches = new ArrayList<>();
+ boolean isFirstBatch = true;
+ long sizeSoFar = 0;
+ while (batchIterator.hasNext() &&
+ (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) {
+ RecordBatch currentBatch = batchIterator.next();
+ batches.add(currentBatch);
+ sizeSoFar += currentBatch.sizeInBytes();
+ isFirstBatch = false;
+ }
+ return RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
new file mode 100644
index 0000000..b782114
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+
+/**
+ * Encapsulation for {@link RecordsSend} for {@link LazyDownConversionRecords}. Records are down-converted in batches and
+ * on-demand when {@link #writeTo} method is called.
+ */
+public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownConversionRecords> {
+ private static final Logger log = LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
+ private static final int MAX_READ_SIZE = 128 * 1024;
+
+ private RecordConversionStats recordConversionStats;
+ private RecordsSend convertedRecordsWriter;
+ private Iterator<ConvertedRecords> convertedRecordsIterator;
+
+ public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecords records) {
+ super(destination, records, records.sizeInBytes());
+ convertedRecordsWriter = null;
+ recordConversionStats = new RecordConversionStats();
+ convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
+ }
+
+ @Override
+ public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
+ if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
+ MemoryRecords convertedRecords;
+
+ // Check if we have more chunks left to down-convert
+ if (convertedRecordsIterator.hasNext()) {
+ // Get next chunk of down-converted messages
+ ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
+ convertedRecords = recordsAndStats.records();
+
+ int sizeOfFirstConvertedBatch = convertedRecords.batchIterator().next().sizeInBytes();
+ if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > size())
+ throw new EOFException("Unable to send first batch completely." +
+ " maximum_size: " + size() +
+ " converted_records_size: " + sizeOfFirstConvertedBatch);
+
+ recordConversionStats.add(recordsAndStats.recordConversionStats());
+ log.debug("Got lazy converted records for {" + topicPartition() + "} with length=" + convertedRecords.sizeInBytes());
+ } else {
+ if (previouslyWritten == 0)
+ throw new EOFException("Unable to get the first batch of down-converted records");
+
+ // We do not have any records left to down-convert. Construct a "fake" message for the length remaining.
+ // This message will be ignored by the consumer because its length will be past the length of maximum
+ // possible response size.
+ // DefaultRecordBatch =>
+ // BaseOffset => Int64
+ // Length => Int32
+ // ...
+ // TODO: check if there is a better way to encapsulate this logic, perhaps in DefaultRecordBatch
+ log.debug("Constructing fake message batch for topic-partition {" + topicPartition() + "} for remaining length " + remaining);
+ int minLength = (Long.SIZE / Byte.SIZE) + (Integer.SIZE / Byte.SIZE);
+ ByteBuffer fakeMessageBatch = ByteBuffer.allocate(Math.max(minLength, Math.min(remaining + 1, MAX_READ_SIZE)));
+ fakeMessageBatch.putLong(-1L);
+ fakeMessageBatch.putInt(remaining + 1);
+ convertedRecords = MemoryRecords.readableRecords(fakeMessageBatch);
+ }
+
+ convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
+ }
+ return convertedRecordsWriter.writeTo(channel);
+ }
+
+ public RecordConversionStats recordConversionStats() {
+ return recordConversionStats;
+ }
+
+ public TopicPartition topicPartition() {
+ return records().topicPartition();
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index be7ea62..55a4711 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
@@ -49,7 +50,7 @@ public class MemoryRecords extends AbstractRecords {
private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
@Override
public Iterator<MutableRecordBatch> iterator() {
- return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
+ return batchIterator();
}
};
@@ -115,7 +116,12 @@ public class MemoryRecords extends AbstractRecords {
@Override
public ConvertedRecords<MemoryRecords> downConvert(byte toMagic, long firstOffset, Time time) {
- return downConvert(batches(), toMagic, firstOffset, time);
+ return RecordsUtil.downConvert(batches(), toMagic, firstOffset, time);
+ }
+
+ @Override
+ public AbstractIterator<MutableRecordBatch> batchIterator() {
+ return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
similarity index 59%
rename from clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
rename to clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 6b66360..2bc8d1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -14,34 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.network;
+package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Queue;
/**
- * A set of composite sends, sent one after another
+ * A set of composite sends with nested {@link RecordsSend}, sent one after another
*/
-public class MultiSend implements Send {
- private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
+public class MultiRecordsSend implements Send {
+ private static final Logger log = LoggerFactory.getLogger(MultiRecordsSend.class);
private final String dest;
private final Queue<Send> sendQueue;
private final long size;
+ private Map<TopicPartition, RecordConversionStats> recordConversionStats;
private long totalWritten = 0;
private Send current;
/**
- * Construct a MultiSend for the given destination from a queue of Send objects. The queue will be
- * consumed as the MultiSend progresses (on completion, it will be empty).
+ * Construct a MultiRecordsSend for the given destination from a queue of Send objects. The queue will be
+ * consumed as the MultiRecordsSend progresses (on completion, it will be empty).
*/
- public MultiSend(String dest, Queue<Send> sends) {
+ public MultiRecordsSend(String dest, Queue<Send> sends) {
this.dest = dest;
this.sendQueue = sends;
@@ -88,8 +93,10 @@ public class MultiSend implements Send {
long written = current.writeTo(channel);
totalWrittenPerCall += written;
sendComplete = current.completed();
- if (sendComplete)
+ if (sendComplete) {
+ updateRecordConversionStats(current);
current = sendQueue.poll();
+ }
} while (!completed() && sendComplete);
totalWritten += totalWrittenPerCall;
@@ -103,4 +110,24 @@ public class MultiSend implements Send {
return totalWrittenPerCall;
}
+ /**
+ * Get any statistics that were recorded as part of executing this {@link MultiRecordsSend}.
+ * @return Records processing statistics (could be null if no statistics were collected)
+ */
+ public Map<TopicPartition, RecordConversionStats> recordConversionStats() {
+ return recordConversionStats;
+ }
+
+ private void updateRecordConversionStats(Send completedSend) {
+ // The underlying send might have accumulated statistics that need to be recorded. For example,
+ // LazyDownConversionRecordsSend accumulates statistics related to the number of bytes down-converted, the amount
+ // of temporary memory used for down-conversion, etc. Pull out any such statistics from the underlying send
+ // and fold it up appropriately.
+ if (completedSend instanceof LazyDownConversionRecordsSend) {
+ if (recordConversionStats == null)
+ recordConversionStats = new HashMap<>();
+ LazyDownConversionRecordsSend lazyRecordsSend = (LazyDownConversionRecordsSend) completedSend;
+ recordConversionStats.put(lazyRecordsSend.topicPartition(), lazyRecordsSend.recordConversionStats());
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
similarity index 71%
rename from clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
index e104bc8..4f0bca5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java
@@ -16,20 +16,30 @@
*/
package org.apache.kafka.common.record;
-public class RecordsProcessingStats {
+public class RecordConversionStats {
- public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0, -1);
+ public static final RecordConversionStats EMPTY = new RecordConversionStats();
- private final long temporaryMemoryBytes;
- private final int numRecordsConverted;
- private final long conversionTimeNanos;
+ private long temporaryMemoryBytes;
+ private int numRecordsConverted;
+ private long conversionTimeNanos;
- public RecordsProcessingStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) {
+ public RecordConversionStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) {
this.temporaryMemoryBytes = temporaryMemoryBytes;
this.numRecordsConverted = numRecordsConverted;
this.conversionTimeNanos = conversionTimeNanos;
}
+ public RecordConversionStats() {
+ this(0, 0, 0);
+ }
+
+ public void add(RecordConversionStats stats) {
+ temporaryMemoryBytes += stats.temporaryMemoryBytes;
+ numRecordsConverted += stats.numRecordsConverted;
+ conversionTimeNanos += stats.conversionTimeNanos;
+ }
+
/**
* Returns the number of temporary memory bytes allocated to process the records.
* This size depends on whether the records need decompression and/or conversion:
@@ -54,7 +64,7 @@ public class RecordsProcessingStats {
@Override
public String toString() {
- return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)",
+ return String.format("RecordConversionStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)",
temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 19152ba..23607b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Time;
+
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
-import org.apache.kafka.common.utils.Time;
/**
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of record
@@ -28,20 +31,19 @@ import org.apache.kafka.common.utils.Time;
* For magic versions 1 and below, each batch consists of an 8 byte offset, a 4 byte record size, and a "shallow"
* {@link Record record}. If the batch is not compressed, then each batch will have only the shallow record contained
* inside it. If it is compressed, the batch contains "deep" records, which are packed into the value field of the
- * shallow record. To iterate over the shallow batches, use {@link #batches()}; for the deep records, use
- * {@link #records()}. Note that the deep iterator handles both compressed and non-compressed batches: if the batch is
- * not compressed, the shallow record is returned; otherwise, the shallow batch is decompressed and the deep records
- * are returned.
+ * shallow record. To iterate over the shallow batches, use {@link Records#batches()}; for the deep records, use
+ * {@link Records#records()}. Note that the deep iterator handles both compressed and non-compressed batches:
+ * if the batch is not compressed, the shallow record is returned; otherwise, the shallow batch is decompressed and the
+ * deep records are returned.
*
* For magic version 2, every batch contains 1 or more log record, regardless of compression. You can iterate
- * over the batches directly using {@link #batches()}. Records can be iterated either directly from an individual
- * batch or through {@link #records()}. Just as in previous versions, iterating over the records typically involves
+ * over the batches directly using {@link Records#batches()}. Records can be iterated either directly from an individual
+ * batch or through {@link Records#records()}. Just as in previous versions, iterating over the records typically involves
* decompression and should therefore be used with caution.
*
* See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
*/
-public interface Records {
-
+public interface Records extends BaseRecords {
int OFFSET_OFFSET = 0;
int OFFSET_LENGTH = 8;
int SIZE_OFFSET = OFFSET_OFFSET + OFFSET_LENGTH;
@@ -55,12 +57,6 @@ public interface Records {
int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;
/**
- * The size of these records in bytes.
- * @return The size in bytes of the records
- */
- int sizeInBytes();
-
- /**
* Attempts to write the contents of this buffer to a channel.
* @param channel The channel to write to
* @param position The position in the buffer to write from
@@ -80,6 +76,13 @@ public interface Records {
Iterable<? extends RecordBatch> batches();
/**
+ * Get an iterator over the record batches. This is similar to {@link #batches()} but returns an {@link AbstractIterator}
+ * instead of {@link Iterator}, so that clients can use methods like {@link AbstractIterator#peek() peek}.
+ * @return An iterator over the record batches of the log
+ */
+ AbstractIterator<? extends RecordBatch> batchIterator();
+
+ /**
* Check whether all batches in this buffer have a certain magic value.
* @param magic The magic value to check
* @return true if all record batches have a matching magic value, false otherwise
@@ -99,7 +102,7 @@ public interface Records {
* deep iteration since all of the deep records must also be converted to the desired format.
* @param toMagic The magic value to convert to
* @param firstOffset The starting offset for returned records. This only impacts some cases. See
- * {@link AbstractRecords#downConvert(Iterable, byte, long, Time) for an explanation.
+ * {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time instance used for reporting stats
* @return A ConvertedRecords instance which may or may not contain the same instance in its records field.
*/
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
index 6608e9b..b40d6e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
@@ -14,29 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common.record;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayers;
-import org.apache.kafka.common.record.Records;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
-public class RecordsSend implements Send {
+public abstract class RecordsSend<T extends BaseRecords> implements Send {
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final String destination;
- private final Records records;
+ private final T records;
+ private final int maxBytesToWrite;
private int remaining;
private boolean pending = false;
- public RecordsSend(String destination, Records records) {
+ protected RecordsSend(String destination, T records, int maxBytesToWrite) {
this.destination = destination;
this.records = records;
- this.remaining = records.sizeInBytes();
+ this.maxBytesToWrite = maxBytesToWrite;
+ this.remaining = maxBytesToWrite;
}
@Override
@@ -50,11 +51,11 @@ public class RecordsSend implements Send {
}
@Override
- public long writeTo(GatheringByteChannel channel) throws IOException {
+ public final long writeTo(GatheringByteChannel channel) throws IOException {
long written = 0;
if (remaining > 0) {
- written = records.writeTo(channel, size() - remaining, remaining);
+ written = writeTo(channel, size() - remaining, remaining);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
@@ -69,6 +70,23 @@ public class RecordsSend implements Send {
@Override
public long size() {
- return records.sizeInBytes();
+ return maxBytesToWrite;
}
+
+ protected T records() {
+ return records;
+ }
+
+ /**
+ * Write records up to `remaining` bytes to `channel`. The implementation is allowed to be stateful. The contract
+ * from the caller is that the first invocation will be with `previouslyWritten` equal to 0, and `remaining` equal to
+ * the to maximum bytes we want to write the to `channel`. `previouslyWritten` and `remaining` will be adjusted
+ * appropriately for every subsequent invocation. See {@link #writeTo} for example expected usage.
+ * @param channel The channel to write to
+ * @param previouslyWritten Bytes written in previous calls to {@link #writeTo(GatheringByteChannel, long, int)}; 0 if being called for the first time
+ * @param remaining Number of bytes remaining to be written
+ * @return The number of bytes actually written
+ * @throws IOException For any IO errors
+ */
+ protected abstract long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
similarity index 50%
copy from clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
copy to clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
index 89a5413..c9b7394 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
@@ -16,41 +16,13 @@
*/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-public abstract class AbstractRecords implements Records {
-
- private final Iterable<Record> records = new Iterable<Record>() {
- @Override
- public Iterator<Record> iterator() {
- return recordsIterator();
- }
- };
-
- @Override
- public boolean hasMatchingMagic(byte magic) {
- for (RecordBatch batch : batches())
- if (batch.magic() != magic)
- return false;
- return true;
- }
-
- @Override
- public boolean hasCompatibleMagic(byte magic) {
- for (RecordBatch batch : batches())
- if (batch.magic() > magic)
- return false;
- return true;
- }
-
+public class RecordsUtil {
/**
* Down convert batches to the provided message format version. The first offset parameter is only relevant in the
* conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
@@ -65,8 +37,8 @@ public abstract class AbstractRecords implements Records {
* only load records into the heap when down converting), but it's not for the producer. However, down converting
* in the producer is very uncommon and the extra complexity to handle that case is not worth it.
*/
- protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
- long firstOffset, Time time) {
+ protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
+ long firstOffset, Time time) {
// maintain the batch along with the decompressed records to avoid the need to decompress again
List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
int totalSizeEstimate = 0;
@@ -93,7 +65,7 @@ public abstract class AbstractRecords implements Records {
baseOffset = batch.baseOffset();
else
baseOffset = records.get(0).offset();
- totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
+ totalSizeEstimate += AbstractRecords.estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
}
}
@@ -114,7 +86,7 @@ public abstract class AbstractRecords implements Records {
}
buffer.flip();
- RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted,
+ RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted,
time.nanoseconds() - startNanos);
return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
}
@@ -123,7 +95,7 @@ public abstract class AbstractRecords implements Records {
* Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
* one (e.g. it may require expansion).
*/
- private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+ private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
RecordBatch batch = recordBatchAndRecords.batch;
final TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
@@ -142,104 +114,6 @@ public abstract class AbstractRecords implements Records {
return builder;
}
- /**
- * Get an iterator over the deep records.
- * @return An iterator over the records
- */
- @Override
- public Iterable<Record> records() {
- return records;
- }
-
- private Iterator<Record> recordsIterator() {
- return new AbstractIterator<Record>() {
- private final Iterator<? extends RecordBatch> batches = batches().iterator();
- private Iterator<Record> records;
-
- @Override
- protected Record makeNext() {
- if (records != null && records.hasNext())
- return records.next();
-
- if (batches.hasNext()) {
- records = batches.next().iterator();
- return makeNext();
- }
-
- return allDone();
- }
- };
- }
-
- public static int estimateSizeInBytes(byte magic,
- long baseOffset,
- CompressionType compressionType,
- Iterable<Record> records) {
- int size = 0;
- if (magic <= RecordBatch.MAGIC_VALUE_V1) {
- for (Record record : records)
- size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
- } else {
- size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
- }
- return estimateCompressedSizeInBytes(size, compressionType);
- }
-
- public static int estimateSizeInBytes(byte magic,
- CompressionType compressionType,
- Iterable<SimpleRecord> records) {
- int size = 0;
- if (magic <= RecordBatch.MAGIC_VALUE_V1) {
- for (SimpleRecord record : records)
- size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
- } else {
- size = DefaultRecordBatch.sizeInBytes(records);
- }
- return estimateCompressedSizeInBytes(size, compressionType);
- }
-
- private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
- return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
- }
-
- /**
- * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
- * an estimate because it does not take into account overhead from the compression algorithm.
- */
- public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, Header[] headers) {
- return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
- }
-
- /**
- * Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
- * an estimate because it does not take into account overhead from the compression algorithm.
- */
- public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
- ByteBuffer value, Header[] headers) {
- if (magic >= RecordBatch.MAGIC_VALUE_V2)
- return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
- else if (compressionType != CompressionType.NONE)
- return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value);
- else
- return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
- }
-
- /**
- * Return the size of the record batch header.
- *
- * For V0 and V1 with no compression, it's unclear if Records.LOG_OVERHEAD or 0 should be chosen. There is no header
- * per batch, but a sequence of batches is preceded by the offset and size. This method returns `0` as it's what
- * `MemoryRecordsBuilder` requires.
- */
- public static int recordBatchHeaderSizeInBytes(byte magic, CompressionType compressionType) {
- if (magic > RecordBatch.MAGIC_VALUE_V1) {
- return DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
- } else if (compressionType != CompressionType.NONE) {
- return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
- } else {
- return 0;
- }
- }
private static class RecordBatchAndRecords {
private final RecordBatch batch;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8d28521..c0ebef1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -73,7 +73,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case PRODUCE:
return new ProduceResponse(struct);
case FETCH:
- return new FetchResponse(struct);
+ return FetchResponse.parse(struct);
case LIST_OFFSETS:
return new ListOffsetResponse(struct);
case METADATA:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 103821b..16e3396 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ByteBufferSend;
-import org.apache.kafka.common.network.MultiSend;
+import org.apache.kafka.common.record.MultiRecordsSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -26,7 +26,8 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -50,7 +51,7 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
/**
* This wrapper supports all versions of the Fetch API
*/
-public class FetchResponse extends AbstractResponse {
+public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
private static final String RESPONSES_KEY_NAME = "responses";
@@ -156,10 +157,10 @@ public class FetchResponse extends AbstractResponse {
public static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
private static final Schema FETCH_RESPONSE_V7 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE,
- SESSION_ID,
- new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+ THROTTLE_TIME_MS,
+ ERROR_CODE,
+ SESSION_ID,
+ new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
@@ -190,7 +191,7 @@ public class FetchResponse extends AbstractResponse {
private final int throttleTimeMs;
private final Errors error;
private final int sessionId;
- private final LinkedHashMap<TopicPartition, PartitionData> responseData;
+ private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
public static final class AbortedTransaction {
public final long producerId;
@@ -226,20 +227,20 @@ public class FetchResponse extends AbstractResponse {
}
}
- public static final class PartitionData {
+ public static final class PartitionData<T extends BaseRecords> {
public final Errors error;
public final long highWatermark;
public final long lastStableOffset;
public final long logStartOffset;
public final List<AbortedTransaction> abortedTransactions;
- public final Records records;
+ public final T records;
public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
long logStartOffset,
List<AbortedTransaction> abortedTransactions,
- Records records) {
+ T records) {
this.error = error;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
@@ -297,16 +298,18 @@ public class FetchResponse extends AbstractResponse {
* @param throttleTimeMs The time in milliseconds that the response was throttled
* @param sessionId The fetch session id.
*/
- public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData> responseData,
- int throttleTimeMs, int sessionId) {
+ public FetchResponse(Errors error,
+ LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
+ int throttleTimeMs,
+ int sessionId) {
this.error = error;
this.responseData = responseData;
this.throttleTimeMs = throttleTimeMs;
this.sessionId = sessionId;
}
- public FetchResponse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData> responseData = new LinkedHashMap<>();
+ public static FetchResponse<MemoryRecords> parse(Struct struct) {
+ LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.get(TOPIC_NAME);
@@ -323,7 +326,10 @@ public class FetchResponse extends AbstractResponse {
if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
- Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+ BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+ if (!(baseRecords instanceof MemoryRecords))
+ throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
+ MemoryRecords records = (MemoryRecords) baseRecords;
List<AbortedTransaction> abortedTransactions = null;
if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
@@ -339,15 +345,13 @@ public class FetchResponse extends AbstractResponse {
}
}
- PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset, logStartOffset,
- abortedTransactions, records);
+ PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
+ logStartOffset, abortedTransactions, records);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
- this.responseData = responseData;
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- this.error = Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0));
- this.sessionId = struct.getOrElse(SESSION_ID, INVALID_SESSION_ID);
+ return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
+ struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
}
@Override
@@ -369,14 +373,14 @@ public class FetchResponse extends AbstractResponse {
Queue<Send> sends = new ArrayDeque<>();
sends.add(new ByteBufferSend(dest, buffer));
addResponseData(responseBodyStruct, throttleTimeMs, dest, sends);
- return new MultiSend(dest, sends);
+ return new MultiRecordsSend(dest, sends);
}
public Errors error() {
return error;
}
- public LinkedHashMap<TopicPartition, PartitionData> responseData() {
+ public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
return responseData;
}
@@ -397,8 +401,8 @@ public class FetchResponse extends AbstractResponse {
return errorCounts;
}
- public static FetchResponse parse(ByteBuffer buffer, short version) {
- return new FetchResponse(ApiKeys.FETCH.responseSchema(version).read(buffer));
+ public static FetchResponse<MemoryRecords> parse(ByteBuffer buffer, short version) {
+ return parse(ApiKeys.FETCH.responseSchema(version).read(buffer));
}
private static void addResponseData(Struct struct, int throttleTimeMs, String dest, Queue<Send> sends) {
@@ -446,7 +450,7 @@ public class FetchResponse extends AbstractResponse {
private static void addPartitionData(String dest, Queue<Send> sends, Struct partitionData) {
Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
- Records records = partitionData.getRecords(RECORD_SET_KEY_NAME);
+ BaseRecords records = partitionData.getRecords(RECORD_SET_KEY_NAME);
// include the partition header and the size of the record set
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 4);
@@ -456,24 +460,25 @@ public class FetchResponse extends AbstractResponse {
sends.add(new ByteBufferSend(dest, buffer));
// finally the send for the record set itself
- sends.add(new RecordsSend(dest, records));
+ sends.add(records.toSend(dest));
}
- private static Struct toStruct(short version, int throttleTimeMs, Errors error,
- Iterator<Map.Entry<TopicPartition, PartitionData>> partIterator, int sessionId) {
+ private static <T extends BaseRecords> Struct toStruct(short version, int throttleTimeMs, Errors error,
+ Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
+ int sessionId) {
Struct struct = new Struct(ApiKeys.FETCH.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
struct.setIfExists(ERROR_CODE, error.code());
struct.setIfExists(SESSION_ID, sessionId);
- List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData =
- FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
+ List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
+ FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
List<Struct> topicArray = new ArrayList<>();
- for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
+ for (FetchRequest.TopicAndPartitionData<PartitionData<T>> topicEntry: topicsData) {
Struct topicData = struct.instance(RESPONSES_KEY_NAME);
topicData.set(TOPIC_NAME, topicEntry.topic);
List<Struct> partitionArray = new ArrayList<>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
+ for (Map.Entry<Integer, PartitionData<T>> partitionEntry : topicEntry.partitions.entrySet()) {
+ PartitionData<T> fetchPartitionData = partitionEntry.getValue();
short errorCode = fetchPartitionData.error.code();
// If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
@@ -524,7 +529,8 @@ public class FetchResponse extends AbstractResponse {
* @param partIterator The partition iterator.
* @return The response size in bytes.
*/
- public static int sizeOf(short version, Iterator<Map.Entry<TopicPartition, PartitionData>> partIterator) {
+ public static <T extends BaseRecords> int sizeOf(short version,
+ Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
// use arbitrary values here without affecting the result.
return 4 + toStruct(version, 0, Errors.NONE, partIterator, INVALID_SESSION_ID).sizeOf();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ce722cf..4886c6b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -44,11 +44,10 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.IsolationLevel;
@@ -1709,8 +1708,8 @@ public class KafkaConsumerTest {
}
- private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
- LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
+ private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+ LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> tpResponses = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
TopicPartition partition = fetchEntry.getKey();
long fetchOffset = fetchEntry.getValue().offset;
@@ -1725,8 +1724,10 @@ public class KafkaConsumerTest {
builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
records = builder.build();
}
- tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
- null, records));
+ tpResponses.put(partition,
+ new FetchResponse.PartitionData(
+ Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ 0L, null, records));
}
return new FetchResponse(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 76cde8e..9164daa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -62,11 +62,11 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchRequest.PartitionData;
-import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
@@ -885,15 +885,15 @@ public class FetcherTest {
Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ FetchResponse.INVALID_LOG_START_OFFSET, null, records));
partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
- 0L, null, nextRecords));
+ 0L, null, nextRecords));
partitions.put(tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 4,
- 0L, null, partialRecords));
+ 0L, null, partialRecords));
client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
- 0, INVALID_SESSION_ID));
+ 0, INVALID_SESSION_ID));
consumerClient.poll(0);
List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
@@ -948,7 +948,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(0);
@@ -959,7 +959,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
partitions = new HashMap<>();
partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
consumerClient.poll(0);
assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -1660,7 +1660,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
- 0, INVALID_SESSION_ID));
+ 0, INVALID_SESSION_ID));
consumerClient.poll(0);
fetcher.fetchedRecords();
@@ -1700,7 +1700,7 @@ public class FetcherTest {
MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes()))));
client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions),
- 0, INVALID_SESSION_ID));
+ 0, INVALID_SESSION_ID));
consumerClient.poll(0);
fetcher.fetchedRecords();
@@ -2267,9 +2267,9 @@ public class FetcherTest {
// Fetch some records and establish an incremental fetch session.
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions1 = new LinkedHashMap<>();
partitions1.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 2L,
- 2, 0L, null, this.records));
+ 2, 0L, null, this.records));
partitions1.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, emptyRecords));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, emptyRecords));
FetchResponse resp1 = new FetchResponse(Errors.NONE, partitions1, 0, 123);
client.prepareResponse(resp1);
assertEquals(1, fetcher.sendFetches());
@@ -2308,7 +2308,7 @@ public class FetcherTest {
// The third response contains some new records for tp0.
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions3 = new LinkedHashMap<>();
partitions3.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100L,
- 4, 0L, null, this.nextRecords));
+ 4, 0L, null, this.nextRecords));
new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions1), 0, INVALID_SESSION_ID);
FetchResponse resp3 = new FetchResponse(Errors.NONE, partitions3, 0, 123);
client.prepareResponse(resp3);
@@ -2446,33 +2446,33 @@ public class FetcherTest {
return new ListOffsetResponse(allPartitionData);
}
- private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records,
- List<FetchResponse.AbortedTransaction> abortedTransactions,
- Errors error,
- long lastStableOffset,
- long hw,
- int throttleTime) {
- Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp0,
- new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, records));
- return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+ private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords records,
+ List<FetchResponse.AbortedTransaction> abortedTransactions,
+ Errors error,
+ long lastStableOffset,
+ long hw,
+ int throttleTime) {
+ Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp0,
+ new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L, abortedTransactions, records));
+ return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
}
- private FetchResponse fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
+ private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
return fullFetchResponse(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime);
}
- private FetchResponse fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
- long lastStableOffset, int throttleTime) {
- Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
- new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, records));
- return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+ private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+ long lastStableOffset, int throttleTime) {
+ Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
+ new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L, null, records));
+ return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
}
- private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+ private FetchResponse<MemoryRecords> fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
long lastStableOffset, long logStartOffset, int throttleTime) {
- Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
- new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
- return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+ Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
+ new FetchResponse.PartitionData<>(error, hw, lastStableOffset, logStartOffset, null, records));
+ return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
}
private MetadataResponse newMetadataResponse(String topic, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index fdd3ede..f8b6dd4 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.MockTime;
@@ -38,11 +39,11 @@ import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertArrayEquals;
public class FileRecordsTest {
@@ -347,6 +348,12 @@ public class FileRecordsTest {
Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
assertTrue("No message should be there", batches(messageV0).isEmpty());
assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
+
+ // Lazy down-conversion will not return any messages for a partial input batch
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+ LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, slice, RecordBatch.MAGIC_VALUE_V0, 0, Time.SYSTEM);
+ Iterator<ConvertedRecords> it = lazyRecords.iterator(16 * 1024L);
+ assertTrue("No messages should be returned", !it.hasNext());
}
@Test
@@ -402,8 +409,7 @@ public class FileRecordsTest {
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
- Records convertedRecords = fileRecords.downConvert(toMagic, 0L, time).records();
- verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
+ downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 0L, time);
if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) {
long firstOffset;
@@ -411,17 +417,15 @@ public class FileRecordsTest {
firstOffset = 11L; // v1 record
else
firstOffset = 17; // v2 record
- Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset, time).records();
List<Long> filteredOffsets = new ArrayList<>(offsets);
List<SimpleRecord> filteredRecords = new ArrayList<>(records);
int index = filteredOffsets.indexOf(firstOffset) - 1;
filteredRecords.remove(index);
filteredOffsets.remove(index);
- verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic);
+ downConvertAndVerifyRecords(filteredRecords, filteredOffsets, fileRecords, compressionType, toMagic, firstOffset, time);
} else {
// firstOffset doesn't have any effect in this case
- Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L, time).records();
- verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic);
+ downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 10L, time);
}
}
}
@@ -430,40 +434,98 @@ public class FileRecordsTest {
return Utils.utf8(buffer, buffer.remaining());
}
+ private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
+ List<Long> initialOffsets,
+ FileRecords fileRecords,
+ CompressionType compressionType,
+ byte toMagic,
+ long firstOffset,
+ Time time) {
+ long numBatches = 0;
+ long minBatchSize = Long.MAX_VALUE;
+ long maxBatchSize = Long.MIN_VALUE;
+ for (RecordBatch batch : fileRecords.batches()) {
+ minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
+ maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
+ numBatches++;
+ }
+
+ // Test the normal down-conversion path
+ List<Records> convertedRecords = new ArrayList<>();
+ convertedRecords.add(fileRecords.downConvert(toMagic, firstOffset, time).records());
+ verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);
+ convertedRecords.clear();
+
+ // Test the lazy down-conversion path
+ List<Long> maximumReadSize = asList(16L * 1024L,
+ (long) fileRecords.sizeInBytes(),
+ (long) fileRecords.sizeInBytes() - 1,
+ (long) fileRecords.sizeInBytes() / 4,
+ maxBatchSize + 1,
+ 1L);
+ for (long readSize : maximumReadSize) {
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+ LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, fileRecords, toMagic, firstOffset, Time.SYSTEM);
+ Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
+ while (it.hasNext())
+ convertedRecords.add(it.next().records());
+
+ // Check if chunking works as expected. The only way to predictably test for this is by testing the edge cases.
+ // 1. If maximum read size is greater than the size of all batches combined, we must get all down-conversion
+ // records in exactly two batches; the first chunk is pre down-converted and returned, and the second chunk
+ // contains the remaining batches.
+ // 2. If maximum read size is just smaller than the size of all batches combined, we must get results in two
+ // chunks.
+ // 3. If maximum read size is less than the size of a single record, we get one batch in each chunk.
+ if (readSize >= fileRecords.sizeInBytes())
+ assertEquals(2, convertedRecords.size());
+ else if (readSize == fileRecords.sizeInBytes() - 1)
+ assertEquals(2, convertedRecords.size());
+ else if (readSize <= minBatchSize)
+ assertEquals(numBatches, convertedRecords.size());
+
+ verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);
+ convertedRecords.clear();
+ }
+ }
+
private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
- Records convertedRecords,
+ List<Records> convertedRecordsList,
CompressionType compressionType,
byte magicByte) {
int i = 0;
- for (RecordBatch batch : convertedRecords.batches()) {
- assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
- if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
- assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
- else
- assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
- assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
- for (Record record : batch) {
- assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
- assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
- assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
- assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
- assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
- if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
- assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
- assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
- assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
- assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
- assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
- assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- } else {
- assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
- assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
- assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
- assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+
+ for (Records convertedRecords : convertedRecordsList) {
+ for (RecordBatch batch : convertedRecords.batches()) {
+ assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
+ if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+ else
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+ for (Record record : batch) {
+ assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
+ assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+ assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
+ assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
+ assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
+ if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
+ assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+ assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
+ assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+ assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ } else {
+ assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+ assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+ }
+ i += 1;
}
- i += 1;
}
}
assertEquals(initialOffsets.size(), i);
@@ -490,5 +552,4 @@ public class FileRecordsTest {
}
fileRecords.flush();
}
-
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
new file mode 100644
index 0000000..8765603
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+public class LazyDownConversionRecordsTest {
+ private final CompressionType compressionType;
+ private final byte toMagic;
+ private final DownConversionTest test;
+
+ public LazyDownConversionRecordsTest(CompressionType compressionType, byte toMagic, DownConversionTest test) {
+ this.compressionType = compressionType;
+ this.toMagic = toMagic;
+ this.test = test;
+ }
+
+ enum DownConversionTest {
+ DEFAULT,
+ OVERFLOW,
+ }
+
+ @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, test={2}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
+ for (DownConversionTest test : DownConversionTest.values()) {
+ values.add(new Object[]{CompressionType.NONE, toMagic, test});
+ values.add(new Object[]{CompressionType.GZIP, toMagic, test});
+ }
+ }
+ return values;
+ }
+
+ @Test
+ public void doTestConversion() throws IOException {
+ List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+ Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+ new RecordHeader("headerKey2", "headerValue2".getBytes()),
+ new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
+ List<SimpleRecord> records = asList(
+ new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+ new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
+ new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
+ new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
+ new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
+ new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
+ new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
+ new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
+ new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
+ new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+ assertEquals("incorrect test setup", offsets.size(), records.size());
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+ TimestampType.CREATE_TIME, 0L);
+ for (int i = 0; i < 3; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L);
+ for (int i = 3; i < 6; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L);
+ for (int i = 6; i < 10; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ buffer.flip();
+
+ try (FileRecords inputRecords = FileRecords.open(tempFile())) {
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+ inputRecords.append(memoryRecords);
+ inputRecords.flush();
+
+ LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test", 1),
+ inputRecords, toMagic, 0L, Time.SYSTEM);
+ LazyDownConversionRecordsSend lazySend = lazyRecords.toSend("foo");
+ File outputFile = tempFile();
+ FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
+
+ // Size of lazy records is at least as much as the size of underlying records
+ assertTrue(lazyRecords.sizeInBytes() >= inputRecords.sizeInBytes());
+
+ int toWrite;
+ int written = 0;
+ List<SimpleRecord> recordsBeingConverted;
+ List<Long> offsetsOfRecords;
+ switch (test) {
+ case DEFAULT:
+ toWrite = inputRecords.sizeInBytes();
+ recordsBeingConverted = records;
+ offsetsOfRecords = offsets;
+ break;
+ case OVERFLOW:
+ toWrite = inputRecords.sizeInBytes() * 2;
+ recordsBeingConverted = records;
+ offsetsOfRecords = offsets;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ while (written < toWrite)
+ written += lazySend.writeTo(channel, written, toWrite - written);
+
+ FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(), false);
+ ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes());
+ convertedRecords.readInto(convertedRecordsBuffer, 0);
+ MemoryRecords convertedMemoryRecords = MemoryRecords.readableRecords(convertedRecordsBuffer);
+ verifyDownConvertedRecords(recordsBeingConverted, offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic);
+
+ convertedRecords.close();
+ channel.close();
+ }
+ }
+
+ private String utf8(ByteBuffer buffer) {
+ return Utils.utf8(buffer, buffer.remaining());
+ }
+
+ private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
+ List<Long> initialOffsets,
+ MemoryRecords downConvertedRecords,
+ CompressionType compressionType,
+ byte toMagic) {
+ int i = 0;
+ for (RecordBatch batch : downConvertedRecords.batches()) {
+ assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic() <= toMagic);
+ if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+ else
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+ for (Record record : batch) {
+ assertTrue("Inner record should have magic " + toMagic, record.hasMagic(batch.magic()));
+ assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+ assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
+ assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
+ assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
+ if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
+ assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+ assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ } else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
+ assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+ assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ } else {
+ assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+ assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
+ assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
+ }
+ i += 1;
+ }
+ }
+ assertEquals(initialOffsets.size(), i);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index a90fb29..36b14a2 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -472,7 +472,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecords records = convertedRecords.records();
// Transactional markers are skipped when down converting to V1, so exclude them from size
- verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
+ verifyRecordsProcessingStats(convertedRecords.recordConversionStats(),
3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
@@ -513,7 +513,7 @@ public class MemoryRecordsBuilderTest {
ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
.downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
MemoryRecords records = convertedRecords.records();
- verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
+ verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
@@ -553,7 +553,7 @@ public class MemoryRecordsBuilderTest {
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
- verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
+ verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
} else {
assertEquals(2, batches.size());
@@ -563,7 +563,7 @@ public class MemoryRecordsBuilderTest {
assertEquals(2, batches.get(1).baseOffset());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("3", utf8(logRecords.get(1).key()));
- verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+ verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 1,
records.sizeInBytes(), buffer.limit());
}
}
@@ -678,8 +678,8 @@ public class MemoryRecordsBuilderTest {
assertTrue("Memory usage too high: " + memUsed, iterations < 100);
}
- private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords,
- int numRecordsConverted, long finalBytes, long preConvertedBytes) {
+ private void verifyRecordsProcessingStats(RecordConversionStats processingStats, int numRecords,
+ int numRecordsConverted, long finalBytes, long preConvertedBytes) {
assertNotNull("Records processing info is null", processingStats);
assertEquals(numRecordsConverted, processingStats.numRecordsConverted());
// Since nanoTime accuracy on build machines may not be sufficient to measure small conversion times,
diff --git a/clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java b/clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
similarity index 88%
rename from clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java
rename to clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
index d2b2ef6..38d381c7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/MultiSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MultiRecordsSendTest.java
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.network;
+package org.apache.kafka.common.record;
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -27,7 +29,7 @@ import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class MultiSendTest {
+public class MultiRecordsSendTest {
@Test
public void testSendsFreedAfterWriting() throws IOException {
@@ -45,7 +47,7 @@ public class MultiSendTest {
sends.add(new ByteBufferSend(dest, buffer));
}
- MultiSend send = new MultiSend(dest, sends);
+ MultiRecordsSend send = new MultiRecordsSend(dest, sends);
assertEquals(totalSize, send.size());
for (int i = 0; i < numChunks; i++) {
@@ -69,7 +71,7 @@ public class MultiSendTest {
@Override
public long write(ByteBuffer[] srcs) throws IOException {
// Instead of overflowing, this channel refuses additional writes once the buffer is full,
- // which allows us to test the MultiSend behavior on a per-send basis.
+ // which allows us to test the MultiRecordsSend behavior on a per-send basis.
if (!buffer().hasRemaining())
return 0;
return super.write(srcs);
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 118288b..777dbb5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,11 +48,11 @@ import java.util.regex.Pattern
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
- RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordsProcessingStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -65,7 +65,7 @@ object LogAppendInfo {
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
- * @param recordsProcessingStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
+ * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
@@ -78,7 +78,7 @@ case class LogAppendInfo(var firstOffset: Option[Long],
var offsetOfMaxTimestamp: Long,
var logAppendTime: Long,
var logStartOffset: Long,
- var recordsProcessingStats: RecordsProcessingStats,
+ var recordConversionStats: RecordConversionStats,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
@@ -693,7 +693,7 @@ class Log(@volatile var dir: File,
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
- appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats
+ appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
@@ -939,7 +939,7 @@ class Log(@volatile var dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordsProcessingStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
private def updateProducers(batch: RecordBatch,
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 6515260..2cfbf7d 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -22,7 +22,7 @@ import kafka.common.LongRef
import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.utils.Time
import scala.collection.mutable
@@ -155,14 +155,14 @@ private[kafka] object LogValidator extends Logging {
val convertedRecords = builder.build()
val info = builder.info
- val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
+ val recordConversionStats = new RecordConversionStats(builder.uncompressedBytesWritten,
builder.numRecords, time.nanoseconds - startNanos)
ValidationAndOffsetAssignResult(
validatedRecords = convertedRecords,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
messageSizeMaybeChanged = true,
- recordsProcessingStats = recordsProcessingStats)
+ recordConversionStats = recordConversionStats)
}
private def assignOffsetsNonCompressed(records: MemoryRecords,
@@ -224,7 +224,7 @@ private[kafka] object LogValidator extends Logging {
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
messageSizeMaybeChanged = false,
- recordsProcessingStats = RecordsProcessingStats.EMPTY)
+ recordConversionStats = RecordConversionStats.EMPTY)
}
/**
@@ -315,12 +315,12 @@ private[kafka] object LogValidator extends Logging {
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
- val recordsProcessingStats = new RecordsProcessingStats(uncompressedSizeInBytes, 0, -1)
+ val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
messageSizeMaybeChanged = false,
- recordsProcessingStats = recordsProcessingStats)
+ recordConversionStats = recordConversionStats)
}
}
@@ -358,7 +358,7 @@ private[kafka] object LogValidator extends Logging {
// message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
// to rebuild the records (including recompression if enabled).
val conversionCount = builder.numRecords
- val recordsProcessingStats = new RecordsProcessingStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
+ val recordConversionStats = new RecordConversionStats(uncompresssedSizeInBytes + builder.uncompressedBytesWritten,
conversionCount, time.nanoseconds - startNanos)
ValidationAndOffsetAssignResult(
@@ -366,7 +366,7 @@ private[kafka] object LogValidator extends Logging {
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
messageSizeMaybeChanged = true,
- recordsProcessingStats = recordsProcessingStats)
+ recordConversionStats = recordConversionStats)
}
private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -397,6 +397,6 @@ private[kafka] object LogValidator extends Logging {
maxTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
messageSizeMaybeChanged: Boolean,
- recordsProcessingStats: RecordsProcessingStats)
+ recordConversionStats: RecordConversionStats)
}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index f45e0ce..eecce1d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,9 +25,11 @@ import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, NotNothing}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.RecordConversionStats
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
@@ -181,12 +183,8 @@ object RequestChannel extends Logging {
if (isRequestLoggingEnabled) {
val detailsEnabled = requestLogger.underlying.isTraceEnabled
- val responseString =
- if (response.responseSend.isDefined)
- response.responseAsString.getOrElse(
- throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
- else ""
-
+ val responseString = response.responseString.getOrElse(
+ throw new IllegalStateException("responseAsString should always be defined if request logging is enabled"))
val builder = new StringBuilder(256)
builder.append("Completed request:").append(requestDesc(detailsEnabled))
.append(",response:").append(responseString)
@@ -225,24 +223,55 @@ object RequestChannel extends Logging {
}
- /** responseAsString should only be defined if request logging is enabled */
- class Response(val request: Request, val responseSend: Option[Send], val responseAction: ResponseAction,
- val responseAsString: Option[String]) {
- request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
- if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
+ abstract class Response(val request: Request) {
+ locally {
+ val nowNs = Time.SYSTEM.nanoseconds
+ request.responseCompleteTimeNanos = nowNs
+ if (request.apiLocalCompleteTimeNanos == -1L)
+ request.apiLocalCompleteTimeNanos = nowNs
+ }
def processor: Int = request.processor
- override def toString =
- s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction, responseAsString=$responseAsString)"
+ def responseString: Option[String] = Some("")
+
+ def onComplete: Option[Send => Unit] = None
+
+ override def toString: String
}
- sealed trait ResponseAction
- case object SendAction extends ResponseAction
- case object NoOpAction extends ResponseAction
- case object CloseConnectionAction extends ResponseAction
- case object StartThrottlingAction extends ResponseAction
- case object EndThrottlingAction extends ResponseAction
+ /** responseAsString should only be defined if request logging is enabled */
+ class SendResponse(request: Request,
+ val responseSend: Send,
+ val responseAsString: Option[String],
+ val onCompleteCallback: Option[Send => Unit]) extends Response(request) {
+ override def responseString: Option[String] = responseAsString
+
+ override def onComplete: Option[Send => Unit] = onCompleteCallback
+
+ override def toString: String =
+ s"Response(type=Send, request=$request, send=$responseSend, asString=$responseAsString)"
+ }
+
+ class NoOpResponse(request: Request) extends Response(request) {
+ override def toString: String =
+ s"Response(type=NoOp, request=$request)"
+ }
+
+ class CloseConnectionResponse(request: Request) extends Response(request) {
+ override def toString: String =
+ s"Response(type=CloseConnection, request=$request)"
+ }
+
+ class StartThrottlingResponse(request: Request) extends Response(request) {
+ override def toString: String =
+ s"Response(type=StartThrottling, request=$request)"
+ }
+
+ class EndThrottlingResponse(request: Request) extends Response(request) {
+ override def toString: String =
+ s"Response(type=EndThrottling, request=$request)"
+ }
}
class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
@@ -287,16 +316,16 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def sendResponse(response: RequestChannel.Response) {
if (isTraceEnabled) {
val requestHeader = response.request.header
- val message = response.responseAction match {
- case SendAction =>
- s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${response.responseSend.get.size} bytes."
- case NoOpAction =>
+ val message = response match {
+ case sendResponse: SendResponse =>
+ s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
+ case _: NoOpResponse =>
s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
- case CloseConnectionAction =>
+ case _: CloseConnectionResponse =>
s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
- case StartThrottlingAction =>
+ case _: StartThrottlingResponse =>
s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
- case EndThrottlingAction =>
+ case _: EndThrottlingResponse =>
s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
}
trace(message)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 759396d..db5eda6 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,6 +28,7 @@ import com.yammer.metrics.core.Gauge
import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.common.KafkaException
import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils._
@@ -37,6 +38,7 @@ import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Meter
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.record.MultiRecordsSend
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
@@ -617,36 +619,37 @@ private[kafka] class Processor(val id: Int,
}
private def processNewResponses() {
- var curr: RequestChannel.Response = null
- while ({curr = dequeueResponse(); curr != null}) {
- val channelId = curr.request.context.connectionId
+ var currentResponse: RequestChannel.Response = null
+ while ({currentResponse = dequeueResponse(); currentResponse != null}) {
+ val channelId = currentResponse.request.context.connectionId
try {
- curr.responseAction match {
- case RequestChannel.NoOpAction =>
+ currentResponse match {
+ case response: NoOpResponse =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
- updateRequestMetrics(curr)
- trace("Socket server received empty response to send, registering for read: " + curr)
+ updateRequestMetrics(response)
+ trace("Socket server received empty response to send, registering for read: " + response)
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
// throttling delay has already passed by now.
handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
tryUnmuteChannel(channelId)
- case RequestChannel.SendAction =>
- val responseSend = curr.responseSend.getOrElse(
- throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
- sendResponse(curr, responseSend)
- case RequestChannel.CloseConnectionAction =>
- updateRequestMetrics(curr)
+
+ case response: SendResponse =>
+ sendResponse(response, response.responseSend)
+ case response: CloseConnectionResponse =>
+ updateRequestMetrics(response)
trace("Closing socket connection actively according to the response code.")
close(channelId)
- case RequestChannel.StartThrottlingAction =>
+ case response: StartThrottlingResponse =>
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
- case RequestChannel.EndThrottlingAction =>
+ case response: EndThrottlingResponse =>
// Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
// the client.
handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
tryUnmuteChannel(channelId)
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
}
} catch {
case e: Throwable =>
@@ -713,10 +716,13 @@ private[kafka] class Processor(val id: Int,
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
try {
- val resp = inflightResponses.remove(send.destination).getOrElse {
+ val response = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
- updateRequestMetrics(resp)
+ updateRequestMetrics(response)
+
+ // Invoke send completion callback
+ response.onComplete.foreach(onComplete => onComplete(send))
// Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
// it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
@@ -730,7 +736,7 @@ private[kafka] class Processor(val id: Int,
}
}
- private def updateRequestMetrics(response: RequestChannel.Response) {
+ private def updateRequestMetrics(response: RequestChannel.Response): Unit = {
val request = response.request
val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
request.updateRequestMetrics(networkThreadTimeNanos, response)
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 73b40d1..41ee420 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -269,18 +269,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* @param channelThrottlingCallback Callback for channel throttling
* @return ThrottledChannel object
*/
- def throttle(request: RequestChannel.Request, throttleTimeMs: Int,
- channelThrottlingCallback: (ResponseAction) => Unit) {
- throttle(request.session, request.header.clientId, throttleTimeMs, channelThrottlingCallback)
- }
-
- def throttle(session: Session, clientId: String, throttleTimeMs: Int,
- channelThrottlingCallback: (ResponseAction) => Unit) {
+ def throttle(request: RequestChannel.Request, throttleTimeMs: Int, channelThrottlingCallback: Response => Unit): Unit = {
if (throttleTimeMs > 0) {
- val clientSensors = getOrCreateQuotaSensors(session, clientId)
-
+ val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
clientSensors.throttleTimeSensor.record(throttleTimeMs)
- val throttledChannel = new ThrottledChannel(time, throttleTimeMs, channelThrottlingCallback)
+ val throttledChannel = new ThrottledChannel(request, time, throttleTimeMs, channelThrottlingCallback)
delayQueue.add(throttledChannel)
delayQueueSensor.record()
debug("Channel throttled for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 3810d90..7a47780 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -25,9 +25,9 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.utils.{ImplicitLinkedHashSet, Time, Utils}
import scala.math.Ordered.orderingToOrdered
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
object FetchSession {
type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
- type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
type CACHE_MAP = ImplicitLinkedHashSet[CachedPartition]
- type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData]]
+ type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]]
val NUM_INCREMENTAL_FETCH_SESSISONS = "NumIncrementalFetchSessions"
val NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED = "NumIncrementalFetchPartitionsCached"
@@ -100,7 +100,7 @@ class CachedPartition(val topic: String,
reqData.logStartOffset, -1)
def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
- respData: FetchResponse.PartitionData) =
+ respData: FetchResponse.PartitionData[Records]) =
this(part.topic(), part.partition(),
reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
reqData.logStartOffset, respData.logStartOffset)
@@ -126,7 +126,7 @@ class CachedPartition(val topic: String,
* @param updateResponseData if set to true, update this CachedPartition with new request and response data.
* @return True if this partition should be included in the response; false if it can be omitted.
*/
- def maybeUpdateResponseData(respData: FetchResponse.PartitionData, updateResponseData: Boolean): Boolean = {
+ def maybeUpdateResponseData(respData: FetchResponse.PartitionData[Records], updateResponseData: Boolean): Boolean = {
// Check the response data.
var mustRespond = false
if ((respData.records != null) && (respData.records.sizeInBytes() > 0)) {
@@ -286,7 +286,7 @@ trait FetchContext extends Logging {
* Updates the fetch context with new partition information. Generates response data.
* The response data may require subsequent down-conversion.
*/
- def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse
+ def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records]
def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
FetchSession.partitionsToLogString(partitions, isTraceEnabled)
@@ -306,7 +306,7 @@ class SessionErrorContext(val error: Errors,
}
// Because of the fetch session error, we don't know what partitions were supposed to be in this request.
- override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+ override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
debug(s"Session error fetch context returning $error")
new FetchResponse(error, new FetchSession.RESP_MAP, 0, INVALID_SESSION_ID)
}
@@ -329,7 +329,7 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
}
- override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+ override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
debug(s"Sessionless fetch context returning ${partitionsToLogString(updates.keySet())}")
new FetchResponse(Errors.NONE, updates, 0, INVALID_SESSION_ID)
}
@@ -360,7 +360,7 @@ class FullFetchContext(private val time: Time,
FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
}
- override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+ override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
def createNewSession(): FetchSession.CACHE_MAP = {
val cachedPartitions = new FetchSession.CACHE_MAP(updates.size())
updates.entrySet().asScala.foreach(entry => {
@@ -407,7 +407,7 @@ class IncrementalFetchContext(private val time: Time,
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: Boolean)
extends FetchSession.RESP_MAP_ITER {
- var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = null
+ var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext()) {
@@ -431,7 +431,7 @@ class IncrementalFetchContext(private val time: Time,
nextElement != null
}
- override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = {
+ override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
if (!hasNext()) throw new NoSuchElementException()
val element = nextElement
nextElement = null
@@ -453,7 +453,7 @@ class IncrementalFetchContext(private val time: Time,
}
}
- override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = {
+ override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
session.synchronized {
// Check to make sure that the session epoch didn't change in between
// creating this fetch context and generating this response.
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index abca2f0..dcdfae0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,51 +17,49 @@
package kafka.server
-import java.nio.ByteBuffer
import java.lang.{Long => JLong}
-import java.util.{Collections, Properties}
+import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
+import java.util.{Collections, Properties}
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata}
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
import kafka.network.RequestChannel
-import kafka.network.RequestChannel._
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
+import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
+import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats, Records}
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshakeResponse}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.resource.{Resource => AdminResource}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
-import DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{Node, TopicPartition}
-import scala.collection._
import scala.collection.JavaConverters._
+import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@@ -87,6 +85,7 @@ class KafkaApis(val requestChannel: RequestChannel,
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {
+ type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val adminZkClient = new AdminZkClient(zkClient)
@@ -410,7 +409,6 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
var errorInResponse = false
@@ -436,9 +434,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
- quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
+ quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
} else {
- quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
+ quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
}
}
@@ -463,13 +461,13 @@ class KafkaApis(val requestChannel: RequestChannel,
sendNoOpResponseExemptThrottle(request)
}
} else {
- sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)))
+ sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
}
}
- def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
+ def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.foreach { case (tp, info) =>
- updateRecordsProcessingStats(request, tp, info)
+ updateRecordConversionStats(request, tp, info)
}
}
@@ -486,7 +484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
isFromClient = true,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
- processingStatsCallback = processingStatsCallback)
+ recordConversionStatsCallback = processingStatsCallback)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
@@ -506,7 +504,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.toForget(),
fetchRequest.isFromFollower())
- val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
+ val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower()) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
@@ -543,7 +541,7 @@ class KafkaApis(val requestChannel: RequestChannel,
})
}
- def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
+ def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = {
// Down-conversion of the fetched records is needed when the stored magic version is
// greater than that supported by the client (as indicated by the fetch request version). If the
// configured magic version for the topic is less than or equal to that supported by the version of the
@@ -553,9 +551,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// which were written in the new format prior to the version downgrade.
replicaManager.getMagic(tp).flatMap { magic =>
val downConvertMagic = {
- if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+ if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
Some(RecordBatch.MAGIC_VALUE_V0)
- else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+ else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
Some(RecordBatch.MAGIC_VALUE_V1)
else
None
@@ -563,18 +561,19 @@ class KafkaApis(val requestChannel: RequestChannel,
downConvertMagic.map { magic =>
trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
- val converted = data.records.downConvert(magic, fetchContext.getFetchOffset(tp).get, time)
- updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats)
- new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- data.logStartOffset, data.abortedTransactions, converted.records)
- }
- }.getOrElse(data)
+ // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
+ // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
+ // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
+ // client.
+ new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
+ }
+ }.getOrElse(unconvertedRecords)
}
// the callback for process a fetch response, invoked before throttling
- def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
- val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+ val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
responsePartitionData.foreach{ case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
@@ -587,16 +586,23 @@ class KafkaApis(val requestChannel: RequestChannel,
// Record time before any byte-rate throttling.
request.apiRemoteCompleteTimeNanos = time.nanoseconds
- var unconvertedFetchResponse: FetchResponse = null
+ var unconvertedFetchResponse: FetchResponse[Records] = null
- def createResponse(throttleTimeMs: Int): FetchResponse = {
- val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
- unconvertedFetchResponse.responseData().asScala.foreach { case (tp, partitionData) =>
- if (partitionData.error != Errors.NONE)
+ def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
+ // Down-convert messages for each partition if required
+ val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
+ unconvertedFetchResponse.responseData().asScala.foreach { case (tp, unconvertedPartitionData) =>
+ if (unconvertedPartitionData.error != Errors.NONE)
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
- s"on partition $tp failed due to ${partitionData.error.exceptionName}")
- convertedData.put(tp, convertedPartitionData(tp, partitionData))
+ s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
+ val convertedRecords = convertRecords(tp, unconvertedPartitionData.records)
+ val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
+ unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset,
+ unconvertedPartitionData.abortedTransactions, convertedRecords)
+ convertedData.put(tp, convertedPartitionData)
}
+
+ // Prepare fetch resopnse from converted data
val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
unconvertedFetchResponse.sessionId())
response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -606,6 +612,16 @@ class KafkaApis(val requestChannel: RequestChannel,
response
}
+ def updateConversionStats(send: Send): Unit = {
+ send match {
+ case send: MultiRecordsSend if send.recordConversionStats != null =>
+ send.recordConversionStats.asScala.toMap.foreach {
+ case (tp, stats) => updateRecordConversionStats(request, tp, stats)
+ }
+ case _ =>
+ }
+ }
+
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
@@ -613,7 +629,7 @@ class KafkaApis(val requestChannel: RequestChannel,
quotas.leader.record(responseSize)
trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " +
s"metadata=${unconvertedFetchResponse.sessionId()}")
- sendResponseExemptThrottle(request, createResponse(0))
+ sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
} else {
// Fetch size used to determine throttle time is calculated before any down conversions.
// This may be slightly different from the actual response size. But since down conversions
@@ -633,22 +649,21 @@ class KafkaApis(val requestChannel: RequestChannel,
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
- quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendActionOnlyResponse(request))
+ quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendResponse)
} else {
- quotas.request.throttle(request, requestThrottleTimeMs, sendActionOnlyResponse(request))
+ quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
}
// If throttling is required, return an empty response.
- unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData](),
- maxThrottleTimeMs, INVALID_SESSION_ID)
+ unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition,
+ FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID)
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
- trace(s"Sending Fetch response with partitions.size=${responseSize}, " +
- s"metadata=${unconvertedFetchResponse.sessionId()}")
+ trace(s"Sending Fetch response with partitions.size=${responseSize}, metadata=${unconvertedFetchResponse.sessionId()}")
}
// Send the response immediately.
- sendResponse(request, Some(createResponse(maxThrottleTimeMs)))
+ sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
}
}
@@ -669,12 +684,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData],
+ class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]],
val quota: ReplicationQuotaManager)
- extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData]] {
+ extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] {
val iter = partitions.entrySet().iterator()
- var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = null
+ var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext()) {
@@ -686,7 +701,7 @@ class KafkaApis(val requestChannel: RequestChannel,
nextElement != null
}
- override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData] = {
+ override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
if (!hasNext()) throw new NoSuchElementException()
val element = nextElement
nextElement = null
@@ -699,7 +714,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
// traffic doesn't exceed quota.
private def sizeOfThrottledPartitions(versionId: Short,
- unconvertedResponse: FetchResponse,
+ unconvertedResponse: FetchResponse[Records],
quota: ReplicationQuotaManager): Int = {
val iter = new SelectingIterator(unconvertedResponse.responseData(), quota)
FetchResponse.sizeOf(versionId, iter)
@@ -2227,9 +2242,10 @@ class KafkaApis(val requestChannel: RequestChannel,
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
- private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition,
- processingStats: RecordsProcessingStats): Unit = {
- val conversionCount = processingStats.numRecordsConverted
+ private def updateRecordConversionStats(request: RequestChannel.Request,
+ tp: TopicPartition,
+ conversionStats: RecordConversionStats): Unit = {
+ val conversionCount = conversionStats.numRecordsConverted
if (conversionCount > 0) {
request.header.apiKey match {
case ApiKeys.PRODUCE =>
@@ -2241,9 +2257,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case _ =>
throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests")
}
- request.messageConversionsTimeNanos = processingStats.conversionTimeNanos
+ request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
}
- request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes
+ request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
private def handleError(request: RequestChannel.Request, e: Throwable) {
@@ -2257,21 +2273,25 @@ class KafkaApis(val requestChannel: RequestChannel,
// Throttle the channel if the request quota is enabled but has been violated. Regardless of throttling, send the
// response immediately.
- private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse): Unit = {
+ private def sendResponseMaybeThrottle(request: RequestChannel.Request,
+ createResponse: Int => AbstractResponse,
+ onComplete: Option[Send => Unit] = None): Unit = {
val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
- quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
- sendResponse(request, Some(createResponse(throttleTimeMs)))
+ quotas.request.throttle(request, throttleTimeMs, sendResponse)
+ sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete)
}
private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable) {
val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
- quotas.request.throttle(request, throttleTimeMs, sendActionOnlyResponse(request))
+ quotas.request.throttle(request, throttleTimeMs, sendResponse)
sendErrorOrCloseConnection(request, error, throttleTimeMs)
}
- private def sendResponseExemptThrottle(request: RequestChannel.Request, response: AbstractResponse): Unit = {
+ private def sendResponseExemptThrottle(request: RequestChannel.Request,
+ response: AbstractResponse,
+ onComplete: Option[Send => Unit] = None): Unit = {
quotas.request.maybeRecordExempt(request)
- sendResponse(request, Some(response))
+ sendResponse(request, Some(response), onComplete)
}
private def sendErrorResponseExemptThrottle(request: RequestChannel.Request, error: Throwable): Unit = {
@@ -2285,38 +2305,41 @@ class KafkaApis(val requestChannel: RequestChannel,
if (response == null)
closeConnection(request, requestBody.errorCounts(error))
else
- sendResponse(request, Some(response))
+ sendResponse(request, Some(response), None)
}
private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): Unit = {
quotas.request.maybeRecordExempt(request)
- sendResponse(request, None)
+ sendResponse(request, None, None)
}
private def closeConnection(request: RequestChannel.Request, errorCounts: java.util.Map[Errors, Integer]): Unit = {
// This case is used when the request handler has encountered an error, but the client
// does not expect a response (e.g. when produce request has acks set to 0)
requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
- sendActionOnlyResponse(request)(CloseConnectionAction)
+ requestChannel.sendResponse(new RequestChannel.CloseConnectionResponse(request))
}
- private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
+ private def sendResponse(request: RequestChannel.Request,
+ responseOpt: Option[AbstractResponse],
+ onComplete: Option[Send => Unit]): Unit = {
// Update error metrics for each error code in the response including Errors.NONE
responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))
- responseOpt match {
+ val response = responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
- requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
+ new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
case None =>
- sendActionOnlyResponse(request)(NoOpAction)
+ new RequestChannel.NoOpResponse(request)
}
+ sendResponse(response)
}
- private def sendActionOnlyResponse(request: RequestChannel.Request)(responseAction: ResponseAction): Unit = {
- requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+ private def sendResponse(response: RequestChannel.Response): Unit = {
+ requestChannel.sendResponse(response)
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index ba7203e..5a505c3 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,19 +20,18 @@ package kafka.server
import java.nio.ByteBuffer
import java.util
-import AbstractFetcherThread.ResultWithPartitions
-import kafka.cluster.BrokerEndPoint
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.EpochEndOffset._
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
-import ReplicaAlterLogDirsThread.FetchRequest
-import ReplicaAlterLogDirsThread.PartitionData
import kafka.api.Request
+import kafka.cluster.BrokerEndPoint
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.ReplicaAlterLogDirsThread.{FetchRequest, PartitionData}
import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
@@ -58,7 +57,7 @@ class ReplicaAlterLogDirsThread(name: String,
private val fetchSize = brokerConfig.replicaFetchMaxBytes
def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
- var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
+ var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.underlying.build()
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
@@ -256,7 +255,7 @@ object ReplicaAlterLogDirsThread {
override def toString = underlying.toString
}
- private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+ private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records]) extends AbstractFetcherThread.PartitionData {
def error = underlying.error
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 72b6616..cf8d829 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
import org.apache.kafka.common.utils.{LogContext, Time}
@@ -224,7 +224,7 @@ class ReplicaFetcherThread(name: String,
protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
- val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
+ val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
if (!fetchSessionHandler.handleResponse(fetchResponse)) {
Nil
} else {
@@ -389,7 +389,7 @@ object ReplicaFetcherThread {
override def toString = underlying.toString
}
- private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+ private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records]) extends AbstractFetcherThread.PartitionData {
def error = underlying.error
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5dbe25b..24f3235 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -31,21 +31,20 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
-import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
+import org.apache.kafka.common.protocol.Errors.{KAFKA_STORAGE_ERROR, UNKNOWN_TOPIC_OR_PARTITION}
import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
import scala.collection._
@@ -471,7 +470,7 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
- processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
+ recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
@@ -485,7 +484,7 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
}
- processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
+ recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala
index 74357d5..8fe8649 100644
--- a/core/src/main/scala/kafka/server/ThrottledChannel.scala
+++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala
@@ -19,28 +19,31 @@ package kafka.server
import java.util.concurrent.{Delayed, TimeUnit}
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
+import kafka.network
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import org.apache.kafka.common.utils.Time
/**
* Represents a request whose response has been delayed.
- * @param time @Time instance to use
- * @param throttleTimeMs delay associated with this request
+ * @param request The request that has been delayed
+ * @param time Time instance to use
+ * @param throttleTimeMs Delay associated with this request
* @param channelThrottlingCallback Callback for channel throttling
*/
-class ThrottledChannel(val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: (ResponseAction) => Unit)
+class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: Response => Unit)
extends Delayed with Logging {
var endTime = time.milliseconds + throttleTimeMs
// Notify the socket server that throttling has started for this channel.
- channelThrottlingCallback(StartThrottlingAction)
+ channelThrottlingCallback(new RequestChannel.StartThrottlingResponse(request))
// Notify the socket server that throttling has been done for this channel.
def notifyThrottlingDone(): Unit = {
trace("Channel throttled for: " + throttleTimeMs + " ms")
- channelThrottlingCallback(EndThrottlingAction)
+ channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request))
}
override def getDelay(unit: TimeUnit): Long = {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index da45be2..681497e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,8 +19,7 @@ import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import kafka.admin.AdminClient
-import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
-import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
import kafka.common.TopicAndPartition
import kafka.log.LogConfig
import kafka.network.SocketServer
@@ -28,23 +27,21 @@ import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.common.{Node, TopicPartition, requests}
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -115,7 +112,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
- ApiKeys.FETCH -> classOf[requests.FetchResponse],
+ ApiKeys.FETCH -> classOf[requests.FetchResponse[Records]],
ApiKeys.LIST_OFFSETS -> classOf[requests.ListOffsetResponse],
ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
@@ -153,7 +150,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val requestKeyToError = Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
- ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
+ ApiKeys.FETCH -> ((resp: requests.FetchResponse[Records]) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error),
ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2),
ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index 0ecc3f5..d4dcd9f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -17,8 +17,8 @@
package kafka.coordinator
-import java.util.{ Collections, Random }
-import java.util.concurrent.{ ConcurrentHashMap, Executors }
+import java.util.{Collections, Random}
+import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
@@ -30,10 +30,10 @@ import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch, RecordsProcessingStats }
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.EasyMock
-import org.junit.{ After, Before }
+import org.junit.{After, Before}
import scala.collection._
import scala.collection.JavaConverters._
@@ -177,7 +177,7 @@ object AbstractCoordinatorConcurrencyTest {
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
- processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
+ processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
if (entriesPerPartition.isEmpty)
return
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index f68ff9e..2a367e0 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -63,7 +63,7 @@ class LogValidatorTest {
assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = false)
}
@@ -103,8 +103,8 @@ class LogValidatorTest {
records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
- val stats = validatedResults.recordsProcessingStats
- verifyRecordsProcessingStats(stats, numConvertedRecords = 3, records, compressed = true)
+ val stats = validatedResults.recordConversionStats
+ verifyRecordConversionStats(stats, numConvertedRecords = 3, records, compressed = true)
}
@Test
@@ -145,7 +145,7 @@ class LogValidatorTest {
records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = true)
}
@@ -255,7 +255,7 @@ class LogValidatorTest {
assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 0, records,
+ verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = false)
}
@@ -320,7 +320,7 @@ class LogValidatorTest {
assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
}
@@ -364,7 +364,7 @@ class LogValidatorTest {
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
}
@@ -405,7 +405,7 @@ class LogValidatorTest {
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
}
@@ -466,7 +466,7 @@ class LogValidatorTest {
validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = true)
}
@@ -697,7 +697,7 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
isFromClient = true)
checkOffsets(validatedResults.validatedRecords, offset)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = false)
}
@@ -719,7 +719,7 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
isFromClient = true)
checkOffsets(validatedResults.validatedRecords, offset)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = false)
}
@@ -741,7 +741,7 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
isFromClient = true)
checkOffsets(validatedResults.validatedRecords, offset)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
}
@@ -763,7 +763,7 @@ class LogValidatorTest {
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
isFromClient = true)
checkOffsets(validatedResults.validatedRecords, offset)
- verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records,
+ verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
compressed = true)
}
@@ -1131,8 +1131,8 @@ class LogValidatorTest {
}
}
- def verifyRecordsProcessingStats(stats: RecordsProcessingStats, numConvertedRecords: Int, records: MemoryRecords,
- compressed: Boolean): Unit = {
+ def verifyRecordConversionStats(stats: RecordConversionStats, numConvertedRecords: Int, records: MemoryRecords,
+ compressed: Boolean): Unit = {
assertNotNull("Records processing info is null", stats)
assertEquals(numConvertedRecords, stats.numRecordsConverted)
if (numConvertedRecords > 0) {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index dfa388b..c0e27cf 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,13 +20,12 @@ package kafka.network
import java.io._
import java.net._
import java.nio.ByteBuffer
-import java.util.{HashMap, Random}
import java.nio.channels.SocketChannel
-import javax.net.ssl._
+import java.util.{HashMap, Random}
import com.yammer.metrics.core.{Gauge, Meter}
import com.yammer.metrics.{Metrics => YammerMetrics}
-import kafka.network.RequestChannel.{NoOpAction, ResponseAction, SendAction}
+import javax.net.ssl._
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, ThrottledChannel}
import kafka.utils.TestUtils
@@ -42,7 +41,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internal.ScramMechanism
import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
import org.apache.log4j.Level
-import org.junit.Assert.{assertEquals, _}
+import org.junit.Assert._
import org.junit._
import org.scalatest.junit.JUnitSuite
@@ -132,7 +131,7 @@ class SocketServerTest extends JUnitSuite {
byteBuffer.rewind()
val send = new NetworkSend(request.context.connectionId, byteBuffer)
- channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString)))
+ channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None))
}
def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
@@ -215,7 +214,7 @@ class SocketServerTest extends JUnitSuite {
for (_ <- 0 until 10) {
val request = receiveRequest(server.requestChannel)
assertNotNull("receiveRequest timed out", request)
- server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None))
+ server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
}
}
@@ -229,7 +228,7 @@ class SocketServerTest extends JUnitSuite {
for (_ <- 0 until 3) {
val request = receiveRequest(server.requestChannel)
assertNotNull("receiveRequest timed out", request)
- server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction, None))
+ server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
}
}
@@ -397,7 +396,7 @@ class SocketServerTest extends JUnitSuite {
// Prepares test setup for throttled channel tests. throttlingDone controls whether or not throttling has completed
// in quota manager.
- def throttledChannelTestSetUp(socket: Socket, serializedBytes: Array[Byte], action: RequestChannel.ResponseAction,
+ def throttledChannelTestSetUp(socket: Socket, serializedBytes: Array[Byte], noOpResponse: Boolean,
throttlingInProgress: Boolean): RequestChannel.Request = {
sendRequest(socket, serializedBytes)
@@ -406,16 +405,21 @@ class SocketServerTest extends JUnitSuite {
val request = receiveRequest(server.requestChannel)
val byteBuffer = request.body[AbstractRequest].serialize(request.header)
val send = new NetworkSend(request.context.connectionId, byteBuffer)
- def channelThrottlingCallback(responseAction: ResponseAction): Unit = {
- server.requestChannel.sendResponse(new RequestChannel.Response(request, None, responseAction, None))
+ def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
+ server.requestChannel.sendResponse(response)
}
- val throttledChannel = new ThrottledChannel(new MockTime(), 100, channelThrottlingCallback)
- server.requestChannel.sendResponse(new RequestChannel.Response(request, Some(send), action,
- Some(request.header.toString)))
+ val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback)
+ val response =
+ if (!noOpResponse)
+ new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)
+ else
+ new RequestChannel.NoOpResponse(request)
+ server.requestChannel.sendResponse(response)
// Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
// false.
- if (!throttlingInProgress) throttledChannel.notifyThrottlingDone()
+ if (!throttlingInProgress)
+ throttledChannel.notifyThrottlingDone()
request
}
@@ -428,7 +432,7 @@ class SocketServerTest extends JUnitSuite {
val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
- val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, true)
+ val request = throttledChannelTestSetUp(socket, serializedBytes, false, true)
// receive response
assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
@@ -442,7 +446,7 @@ class SocketServerTest extends JUnitSuite {
val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
- val request = throttledChannelTestSetUp(socket, serializedBytes, SendAction, false)
+ val request = throttledChannelTestSetUp(socket, serializedBytes, false, false)
// receive response
assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
@@ -457,7 +461,7 @@ class SocketServerTest extends JUnitSuite {
val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
- val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, true)
+ val request = throttledChannelTestSetUp(socket, serializedBytes, true, true)
TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.MUTED_AND_THROTTLED), "fail")
// Channel should still be muted.
@@ -469,7 +473,7 @@ class SocketServerTest extends JUnitSuite {
val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
- val request = throttledChannelTestSetUp(socket, serializedBytes, NoOpAction, false)
+ val request = throttledChannelTestSetUp(socket, serializedBytes, true, false)
// Since throttling is already done, the channel can be unmuted.
TestUtils.waitUntilTrue(() => openOrClosingChannel(request).exists(c => c.muteState() == ChannelMuteState.NOT_MUTED), "fail")
@@ -675,7 +679,7 @@ class SocketServerTest extends JUnitSuite {
// detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an
// IOException.
val send = new NetworkSend(request.context.connectionId, ByteBuffer.allocate(550000))
- channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None))
+ channel.sendResponse(new RequestChannel.SendResponse(request, send, None, None))
TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index b99bac8..c5275c2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -16,13 +16,22 @@
*/
package kafka.server
+import java.net.InetAddress
+import java.util
import java.util.Collections
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, Session, StartThrottlingAction}
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.{EndThrottlingResponse, Session, StartThrottlingResponse}
import kafka.server.QuotaType._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{MockTime, Sanitizer}
+import org.easymock.EasyMock
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Before, Test}
@@ -32,11 +41,11 @@ class ClientQuotaManagerTest {
private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500)
var numCallbacks: Int = 0
- def callback (responseAction: ResponseAction) {
+ def callback (response: RequestChannel.Response) {
// Count how many times this callback is called for notifyThrottlingDone().
- responseAction match {
- case StartThrottlingAction =>
- case EndThrottlingAction => numCallbacks += 1
+ response match {
+ case _: StartThrottlingResponse =>
+ case _: EndThrottlingResponse => numCallbacks += 1
}
}
@@ -45,15 +54,30 @@ class ClientQuotaManagerTest {
numCallbacks = 0
}
+ private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+ listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
+ val request = builder.build()
+ val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
+ val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+
+ // read the header from the buffer first so that the body can be read next from the Request constructor
+ val header = RequestHeader.parse(buffer)
+ val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+ listenerName, SecurityProtocol.PLAINTEXT)
+ (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
+ requestChannelMetrics))
+ }
+
private def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
- quotaManager.maybeRecordAndGetThrottleTimeMs(Session(principal, null),clientId, value, time.milliseconds())
+ quotaManager.maybeRecordAndGetThrottleTimeMs(Session(principal, null), clientId, value, time.milliseconds())
}
private def throttle(quotaManager: ClientQuotaManager, user: String, clientId: String, throttleTimeMs: Int,
- channelThrottlingCallback: (ResponseAction) => Unit) {
- val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
- quotaManager.throttle(Session(principal, null),clientId, throttleTimeMs, channelThrottlingCallback)
+ channelThrottlingCallback: (RequestChannel.Response) => Unit) {
+ val (_, request) = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, new util.HashMap[TopicPartition, PartitionData]))
+ quotaManager.throttle(request, throttleTimeMs, channelThrottlingCallback)
}
private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
@@ -364,6 +388,7 @@ class ClientQuotaManagerTest {
// the sensor should get recreated
val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
+ assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
} finally {
clientMetrics.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 03137e1..424b8c7 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -26,9 +26,8 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch, Records}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.junit.Assert._
import org.junit.Test
@@ -64,7 +63,7 @@ class FetchRequestTest extends BaseRequestTest {
partitionMap
}
- private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
+ private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = {
val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
FetchResponse.parse(response, request.version)
}
@@ -219,7 +218,7 @@ class FetchRequestTest extends BaseRequestTest {
// batch is not complete, but sent when the producer is closed
futures.foreach(_.get)
- def fetch(version: Short, maxPartitionBytes: Int, closeAfterPartialResponse: Boolean): Option[FetchResponse] = {
+ def fetch(version: Short, maxPartitionBytes: Int, closeAfterPartialResponse: Boolean): Option[FetchResponse[MemoryRecords]] = {
val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
Seq(topicPartition))).build(version)
@@ -279,17 +278,32 @@ class FetchRequestTest extends BaseRequestTest {
secondBatchFutures.foreach(_.get)
def check(fetchOffset: Long, requestVersion: Short, expectedOffset: Long, expectedNumBatches: Int, expectedMagic: Byte): Unit = {
- val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue,
- Seq(topicPartition), Map(topicPartition -> fetchOffset))).build(requestVersion)
- val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
- val partitionData = fetchResponse.responseData.get(topicPartition)
- assertEquals(Errors.NONE, partitionData.error)
- assertTrue(partitionData.highWatermark > 0)
- val batches = partitionData.records.batches.asScala.toBuffer
- assertEquals(expectedNumBatches, batches.size)
- val batch = batches.head
- assertEquals(expectedMagic, batch.magic)
- assertEquals(expectedOffset, batch.baseOffset)
+ var batchesReceived = 0
+ var currentFetchOffset = fetchOffset
+ var currentExpectedOffset = expectedOffset
+
+ // With KIP-283, we might not receive all batches in a single fetch request so loop through till we have consumed
+ // all batches we are interested in.
+ while (batchesReceived < expectedNumBatches) {
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue,
+ Seq(topicPartition), Map(topicPartition -> currentFetchOffset))).build(requestVersion)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+ // validate response
+ val partitionData = fetchResponse.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertTrue(partitionData.highWatermark > 0)
+ val batches = partitionData.records.batches.asScala.toBuffer
+ val batch = batches.head
+ assertEquals(expectedMagic, batch.magic)
+ assertEquals(currentExpectedOffset, batch.baseOffset)
+
+ currentFetchOffset = batches.last.lastOffset + 1
+ currentExpectedOffset += (batches.last.lastOffset - batches.head.baseOffset + 1)
+ batchesReceived += batches.size
+ }
+
+ assertEquals(expectedNumBatches, batchesReceived)
}
// down conversion to message format 0, batches of 1 message are returned so we receive the exact offset we requested
@@ -317,9 +331,9 @@ class FetchRequestTest extends BaseRequestTest {
}
/**
- * Test that when an incremental fetch session contains partitions with an error,
- * those partitions are returned in all incremental fetch requests.
- */
+ * Test that when an incremental fetch session contains partitions with an error,
+ * those partitions are returned in all incremental fetch requests.
+ */
@Test
def testCreateIncrementalFetchWithPartitionsInError(): Unit = {
def createFetchRequest(topicPartitions: Seq[TopicPartition],
@@ -327,9 +341,9 @@ class FetchRequestTest extends BaseRequestTest {
toForget: Seq[TopicPartition]): FetchRequest =
FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
createPartitionMap(Integer.MAX_VALUE, topicPartitions, Map.empty))
- .toForget(toForget.asJava)
- .metadata(metadata)
- .build()
+ .toForget(toForget.asJava)
+ .metadata(metadata)
+ .build()
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
createTopic("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
@@ -370,11 +384,11 @@ class FetchRequestTest extends BaseRequestTest {
assertFalse(resp4.responseData().containsKey(bar0))
}
- private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = {
+ private def records(partitionData: FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = {
partitionData.records.records.asScala.toIndexedSeq
}
- private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
+ private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse[MemoryRecords],
maxPartitionBytes: Int, maxResponseBytes: Int, numMessagesPerPartition: Int): Unit = {
assertEquals(expectedPartitions, fetchResponse.responseData.keySet.asScala.toSeq)
var emptyResponseSeen = false
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 8264c1b..84efa6b 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -22,11 +22,12 @@ import java.util.Collections
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{AbstractRecords, Records}
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INVALID_SESSION_ID}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
-import org.junit.{Rule, Test}
import org.junit.Assert._
import org.junit.rules.Timeout
+import org.junit.{Rule, Test}
class FetchSessionTest {
@Rule
@@ -152,7 +153,7 @@ class FetchSessionTest {
})
assertEquals(0, context2.getFetchOffset(new TopicPartition("foo", 0)).get)
assertEquals(10, context2.getFetchOffset(new TopicPartition("foo", 1)).get)
- val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData2.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData2.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -211,7 +212,7 @@ class FetchSessionTest {
new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false)
assertEquals(classOf[SessionlessFetchContext], context7.getClass)
assertEquals(0, cache.size())
- val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData7 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData7.put(new TopicPartition("bar", 0),
new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
respData7.put(new TopicPartition("bar", 1),
@@ -234,7 +235,7 @@ class FetchSessionTest {
reqData1.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100))
val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], context1.getClass)
- val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -261,7 +262,7 @@ class FetchSessionTest {
assertEquals(10, context2.getFetchOffset(new TopicPartition("foo", 1)).get)
assertEquals(15, context2.getFetchOffset(new TopicPartition("bar", 0)).get)
assertEquals(None, context2.getFetchOffset(new TopicPartition("bar", 2)))
- val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData2.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
respData2.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(
@@ -284,7 +285,7 @@ class FetchSessionTest {
reqData1.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100))
val context1 = fetchManager.newContext(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], context1.getClass)
- val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
@@ -303,7 +304,7 @@ class FetchSessionTest {
val context2 = fetchManager.newContext(
new JFetchMetadata(resp1.sessionId(), 1), reqData2, removed2, false)
assertEquals(classOf[SessionlessFetchContext], context2.getClass)
- val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
+ val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
val resp2 = context2.updateAndGenerateResponseData(respData2)
assertEquals(INVALID_SESSION_ID, resp2.sessionId())
assertTrue(resp2.responseData().isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1d6092a..d880011 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -29,6 +29,7 @@ import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{Log, TimestampOffset}
import kafka.network.RequestChannel
+import kafka.network.RequestChannel.SendResponse
import kafka.security.auth.Authorizer
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
@@ -542,7 +543,10 @@ class KafkaApisTest {
}
private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
- val send = capturedResponse.getValue.responseSend.get
+ val response = capturedResponse.getValue
+ assertTrue(s"Unexpected response type: ${response.getClass}", response.isInstanceOf[SendResponse])
+ val sendResponse = response.asInstanceOf[SendResponse]
+ val send = sendResponse.responseSend
val channel = new ByteBufferChannel(send.size)
send.writeTo(channel)
channel.close()
@@ -556,7 +560,7 @@ class KafkaApisTest {
EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request]()))
.andReturn(0)
EasyMock.expect(clientRequestQuotaManager.throttle(EasyMock.anyObject[RequestChannel.Request](), EasyMock.eq(0),
- EasyMock.anyObject[RequestChannel.ResponseAction => Unit]()))
+ EasyMock.anyObject[RequestChannel.Response => Unit]()))
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index dcbeb21..29a1c9f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -18,13 +18,13 @@ package kafka.server
import kafka.api.Request
-import kafka.cluster.{BrokerEndPoint, Replica, Partition}
+import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.epoch.LeaderEpochCache
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, KafkaStorageException}
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.EpochEndOffset
import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, UNDEFINED_EPOCH}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e62c9fd..3f2f66c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -413,7 +413,7 @@ class RequestQuotaTest extends BaseRequestTest {
private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
apiKey match {
case ApiKeys.PRODUCE => new ProduceResponse(response).throttleTimeMs
- case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs
+ case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index 8ba584c..ff781a2 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -18,12 +18,22 @@
package kafka.server
+import java.net.InetAddress
+import java.util
import java.util.Collections
import java.util.concurrent.{DelayQueue, TimeUnit}
-import kafka.network.RequestChannel.{EndThrottlingAction, ResponseAction, StartThrottlingAction}
+import kafka.network.RequestChannel
+import kafka.network.RequestChannel.{EndThrottlingResponse, Response, StartThrottlingResponse}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.MetricConfig
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
+import org.easymock.EasyMock
import org.junit.{Assert, Before, Test}
class ThrottledChannelExpirationTest {
@@ -33,11 +43,27 @@ class ThrottledChannelExpirationTest {
private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(),
Collections.emptyList(),
time)
+ private val request = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, new util.HashMap[TopicPartition, PartitionData]))._2
- def callback(responseAction: ResponseAction): Unit = {
- responseAction match {
- case StartThrottlingAction => numCallbacksForStartThrottling += 1
- case EndThrottlingAction => numCallbacksForEndThrottling += 1
+ private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+ listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
+ val request = builder.build()
+ val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
+ val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+
+ // read the header from the buffer first so that the body can be read next from the Request constructor
+ val header = RequestHeader.parse(buffer)
+ val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+ listenerName, SecurityProtocol.PLAINTEXT)
+ (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
+ requestChannelMetrics))
+ }
+
+ def callback(response: Response): Unit = {
+ response match {
+ case _: StartThrottlingResponse => numCallbacksForStartThrottling += 1
+ case _: EndThrottlingResponse => numCallbacksForEndThrottling += 1
}
}
@@ -55,10 +81,10 @@ class ThrottledChannelExpirationTest {
val reaper = new clientMetrics.ThrottledChannelReaper(delayQueue, "")
try {
// Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp.
- val channel1 = new ThrottledChannel(time, 10, callback)
- val channel2 = new ThrottledChannel(time, 30, callback)
- val channel3 = new ThrottledChannel(time, 30, callback)
- val channel4 = new ThrottledChannel(time, 20, callback)
+ val channel1 = new ThrottledChannel(request, time, 10, callback)
+ val channel2 = new ThrottledChannel(request, time, 30, callback)
+ val channel3 = new ThrottledChannel(request, time, 30, callback)
+ val channel4 = new ThrottledChannel(request, time, 20, callback)
delayQueue.add(channel1)
delayQueue.add(channel2)
delayQueue.add(channel3)
@@ -82,9 +108,9 @@ class ThrottledChannelExpirationTest {
@Test
def testThrottledChannelDelay() {
- val t1: ThrottledChannel = new ThrottledChannel(time, 10, callback)
- val t2: ThrottledChannel = new ThrottledChannel(time, 20, callback)
- val t3: ThrottledChannel = new ThrottledChannel(time, 20, callback)
+ val t1: ThrottledChannel = new ThrottledChannel(request, time, 10, callback)
+ val t2: ThrottledChannel = new ThrottledChannel(request, time, 20, callback)
+ val t3: ThrottledChannel = new ThrottledChannel(request, time, 20, callback)
Assert.assertEquals(10, t1.throttleTimeMs)
Assert.assertEquals(20, t2.throttleTimeMs)
Assert.assertEquals(20, t3.throttleTimeMs)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index 50a4d74..b7c037e 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -19,12 +19,12 @@ package kafka.server.epoch.util
import kafka.cluster.BrokerEndPoint
import kafka.server.BlockingSend
import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
-import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.AbstractRequest.Builder
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{AbstractRequest, EpochEndOffset, FetchResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.utils.{SystemTime, Time}
+import org.apache.kafka.common.{Node, TopicPartition}
/**
* Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing
@@ -66,7 +66,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
case ApiKeys.FETCH =>
fetchCount += 1
- new FetchResponse(Errors.NONE, new java.util.LinkedHashMap[TopicPartition, PartitionData], 0,
+ new FetchResponse(Errors.NONE, new java.util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]], 0,
JFetchMetadata.INVALID_SESSION_ID)
case _ =>
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.