You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/04/14 21:40:55 UTC
[kafka] branch trunk updated: KAFKA-12612: Remove `checksum` from
ConsumerRecord/RecordMetadata for 3.0 (#10470)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 89933f2 KAFKA-12612: Remove `checksum` from ConsumerRecord/RecordMetadata for 3.0 (#10470)
89933f2 is described below
commit 89933f21f204abf75336464d3ac24a4fdd254628
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Apr 14 14:38:37 2021 -0700
KAFKA-12612: Remove `checksum` from ConsumerRecord/RecordMetadata for 3.0 (#10470)
The methods have been deprecated since 0.11 without replacement since
message format 2 moved the checksum to the record batch (instead of the
record).
Unfortunately, we did not deprecate the constructors that take a checksum
(even though we intended to) so we cannot remove them. I have deprecated
them for removal in 4.0 and added a single non deprecated constructor to
`ConsumerRecord` and `RecordMetadata` that take all remaining parameters.
`ConsumerRecord` could do with one additional convenience constructor, but
that requires a KIP and hence should be done separately.
Also:
* Removed `ChecksumMessageFormatter`, which is technically not public
API, but may have been used with the console consumer.
* Updated all usages of `ConsumerRecord`/`RecordMetadata` constructors
to use the non deprecated ones.
* Added tests for deprecated `ConsumerRecord/`RecordMetadata`
constructors.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, David Jacot <dj...@confluent.io>
---
.../kafka/clients/consumer/ConsumerRecord.java | 115 ++++++++++-------
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../kafka/clients/producer/MockProducer.java | 10 +-
.../kafka/clients/producer/RecordMetadata.java | 53 ++++----
.../producer/internals/FutureRecordMetadata.java | 12 +-
.../clients/producer/internals/ProducerBatch.java | 6 +-
.../producer/internals/ProducerInterceptors.java | 2 +-
.../apache/kafka/common/record/DefaultRecord.java | 12 --
.../kafka/common/record/MemoryRecordsBuilder.java | 79 +++++-------
.../kafka/clients/consumer/ConsumerRecordTest.java | 68 ++++++++--
.../clients/consumer/ConsumerRecordsTest.java | 11 +-
.../kafka/clients/consumer/MockConsumerTest.java | 13 +-
.../internals/ConsumerInterceptorsTest.java | 14 ++-
.../kafka/clients/producer/RecordMetadataTest.java | 47 ++++---
.../kafka/clients/producer/RecordSendTest.java | 6 +-
.../internals/FutureRecordMetadataTest.java | 1 -
.../producer/internals/ProducerBatchTest.java | 26 ----
.../internals/ProducerInterceptorsTest.java | 2 +-
.../common/record/MemoryRecordsBuilderTest.java | 3 +-
.../apache/kafka/test/MockConsumerInterceptor.java | 10 +-
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 7 +-
.../runtime/errors/WorkerErrantRecordReporter.java | 5 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 14 ++-
.../runtime/WorkerSinkTaskThreadedTest.java | 15 ++-
.../connect/runtime/WorkerSourceTaskTest.java | 2 +-
.../runtime/errors/ProcessingContextTest.java | 2 +-
.../storage/KafkaConfigBackingStoreTest.java | 138 ++++++++++++++-------
.../storage/KafkaOffsetBackingStoreTest.java | 38 ++++--
.../storage/KafkaStatusBackingStoreTest.java | 4 +-
.../kafka/connect/util/KafkaBasedLogTest.java | 32 +++--
.../main/scala/kafka/tools/ConsoleConsumer.scala | 22 +---
.../kafka/api/BaseProducerSendTest.scala | 3 -
.../kafka/tools/DefaultMessageFormatterTest.scala | 5 +-
.../unit/kafka/tools/ConsoleConsumerTest.scala | 25 +---
docs/upgrade.html | 14 ++-
.../processor/internals/RecordDeserializer.java | 6 +-
.../streams/state/internals/RecordConverters.java | 1 -
.../LogAndSkipOnInvalidTimestampTest.java | 8 +-
.../streams/processor/TimestampExtractorTest.java | 8 +-
.../processor/internals/GlobalStateTaskTest.java | 4 +-
.../internals/RecordDeserializerTest.java | 8 +-
.../processor/internals/RecordQueueTest.java | 74 +++++++----
.../processor/internals/StreamTaskTest.java | 17 ++-
.../processor/internals/StreamThreadTest.java | 26 ++--
.../internals/testutil/ConsumerRecordUtil.java | 8 +-
.../state/internals/RecordConvertersTest.java | 5 +-
.../internals/TimeOrderedKeyValueBufferTest.java | 114 +++++++++--------
.../org/apache/kafka/test/MockRestoreConsumer.java | 5 +-
.../apache/kafka/streams/TopologyTestDriver.java | 9 +-
.../kafka/streams/test/ConsumerRecordFactory.java | 5 +-
.../apache/kafka/streams/test/TestRecordTest.java | 5 +-
52 files changed, 636 insertions(+), 487 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index a7dad7b..2ae93e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
@@ -32,6 +31,12 @@ import java.util.Optional;
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
+
+ /**
+ * @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
+ * (deprecated since 3.0).
+ */
+ @Deprecated
public static final int NULL_CHECKSUM = -1;
private final String topic;
@@ -46,8 +51,6 @@ public class ConsumerRecord<K, V> {
private final V value;
private final Optional<Integer> leaderEpoch;
- private volatile Long checksum;
-
/**
* Creates a record to be received from a specified topic and partition (provided for
* compatibility with Kafka 0.9 before the message format supported timestamps and before
@@ -64,8 +67,52 @@ public class ConsumerRecord<K, V> {
long offset,
K key,
V value) {
- this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
- NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
+ this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, NULL_SIZE, NULL_SIZE, key, value,
+ new RecordHeaders(), Optional.empty());
+ }
+
+ /**
+ * Creates a record to be received from a specified topic and partition
+ *
+ * @param topic The topic this record is received from
+ * @param partition The partition of the topic this record is received from
+ * @param offset The offset of this record in the corresponding Kafka partition
+ * @param timestamp The timestamp of the record.
+ * @param timestampType The timestamp type
+ * @param serializedKeySize The length of the serialized key
+ * @param serializedValueSize The length of the serialized value
+ * @param key The key of the record, if one exists (null is allowed)
+ * @param value The record contents
+ * @param headers The headers of the record
+ * @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
+ */
+ public ConsumerRecord(String topic,
+ int partition,
+ long offset,
+ long timestamp,
+ TimestampType timestampType,
+ int serializedKeySize,
+ int serializedValueSize,
+ K key,
+ V value,
+ Headers headers,
+ Optional<Integer> leaderEpoch) {
+ if (topic == null)
+ throw new IllegalArgumentException("Topic cannot be null");
+ if (headers == null)
+ throw new IllegalArgumentException("Headers cannot be null");
+
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.timestampType = timestampType;
+ this.serializedKeySize = serializedKeySize;
+ this.serializedValueSize = serializedValueSize;
+ this.key = key;
+ this.value = value;
+ this.headers = headers;
+ this.leaderEpoch = leaderEpoch;
}
/**
@@ -77,12 +124,15 @@ public class ConsumerRecord<K, V> {
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
- * @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
+ *
+ * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+ * Apache Kafka 4.0 (deprecated since 3.0).
*/
+ @Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
@@ -93,8 +143,8 @@ public class ConsumerRecord<K, V> {
int serializedValueSize,
K key,
V value) {
- this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
- key, value, new RecordHeaders());
+ this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
+ key, value, new RecordHeaders(), Optional.empty());
}
/**
@@ -105,13 +155,16 @@ public class ConsumerRecord<K, V> {
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
- * @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record.
+ *
+ * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+ * Apache Kafka 4.0 (deprecated since 3.0).
*/
+ @Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
@@ -123,7 +176,7 @@ public class ConsumerRecord<K, V> {
K key,
V value,
Headers headers) {
- this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
+ this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
key, value, headers, Optional.empty());
}
@@ -135,14 +188,17 @@ public class ConsumerRecord<K, V> {
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
- * @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record
* @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
+ *
+ * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+ * Apache Kafka 4.0 (deprecated since 3.0).
*/
+ @Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
@@ -155,23 +211,8 @@ public class ConsumerRecord<K, V> {
V value,
Headers headers,
Optional<Integer> leaderEpoch) {
- if (topic == null)
- throw new IllegalArgumentException("Topic cannot be null");
- if (headers == null)
- throw new IllegalArgumentException("Headers cannot be null");
-
- this.topic = topic;
- this.partition = partition;
- this.offset = offset;
- this.timestamp = timestamp;
- this.timestampType = timestampType;
- this.checksum = checksum;
- this.serializedKeySize = serializedKeySize;
- this.serializedValueSize = serializedValueSize;
- this.key = key;
- this.value = value;
- this.headers = headers;
- this.leaderEpoch = leaderEpoch;
+ this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize, key, value, headers,
+ leaderEpoch);
}
/**
@@ -231,24 +272,6 @@ public class ConsumerRecord<K, V> {
}
/**
- * The checksum (CRC32) of the record.
- *
- * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
- * checksum returned by the broker may not match what was computed by the producer.
- * It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
- * message format v2 does not include a record-level checksum (for performance, the record checksum
- * was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
- * the record timestamp, serialized key size, and serialized value size is returned instead, but
- * this should not be depended on for end-to-end reliability.
- */
- @Deprecated
- public long checksum() {
- if (checksum == null)
- this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
- return this.checksum;
- }
-
- /**
* The size of the serialized, uncompressed key in bytes. If key is null, the returned size
* is -1.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8f9b7a2..fa214ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1385,7 +1385,7 @@ public class Fetcher<K, V> implements Closeable {
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
- timestamp, timestampType, record.checksumOrNull(),
+ timestamp, timestampType,
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers, leaderEpoch);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0b01a86..7f573d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1359,7 +1359,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
- metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
+ metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 650b697..f733869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -311,10 +311,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
- 0L, 0, 0, Time.SYSTEM);
+ 0, 0, Time.SYSTEM);
long offset = nextOffset(topicPartition);
- Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
- RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback, topicPartition);
+ long baseOffset = Math.max(0, offset - Integer.MAX_VALUE);
+ int batchIndex = (int) Math.min(Integer.MAX_VALUE, offset);
+ Completion completion = new Completion(offset, new RecordMetadata(topicPartition, baseOffset, batchIndex,
+ RecordBatch.NO_TIMESTAMP, 0, 0), result, callback, topicPartition);
if (!this.transactionInFlight)
this.sent.add(record);
@@ -537,7 +539,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
if (e == null)
callback.onCompletion(metadata, null);
else
- callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1), e);
+ callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1), e);
}
result.done();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 1ae5cf6..8efc4c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ProduceResponse;
@@ -42,21 +41,38 @@ public final class RecordMetadata {
private final int serializedValueSize;
private final TopicPartition topicPartition;
- private volatile Long checksum;
-
- public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
- Long checksum, int serializedKeySize, int serializedValueSize) {
- // ignore the relativeOffset if the base offset is -1,
- // since this indicates the offset is unknown
- this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
+ /**
+ * Creates a new instance with the provided parameters.
+ */
+ public RecordMetadata(TopicPartition topicPartition, long baseOffset, int batchIndex, long timestamp,
+ int serializedKeySize, int serializedValueSize) {
+ // ignore the batchIndex if the base offset is -1, since this indicates the offset is unknown
+ this.offset = baseOffset == -1 ? baseOffset : baseOffset + batchIndex;
this.timestamp = timestamp;
- this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.topicPartition = topicPartition;
}
/**
+ * Creates a new instance with the provided parameters.
+ *
+ * @deprecated use constructor without `checksum` parameter. This constructor will be removed in
+ * Apache Kafka 4.0 (deprecated since 3.0).
+ */
+ @Deprecated
+ public RecordMetadata(TopicPartition topicPartition, long baseOffset, long batchIndex, long timestamp,
+ Long checksum, int serializedKeySize, int serializedValueSize) {
+ this(topicPartition, baseOffset, batchIndexToInt(batchIndex), timestamp, serializedKeySize, serializedValueSize);
+ }
+
+ private static int batchIndexToInt(long batchIndex) {
+ if (batchIndex > Integer.MAX_VALUE)
+ throw new IllegalArgumentException("batchIndex is larger than Integer.MAX_VALUE: " + batchIndex);
+ return (int) batchIndex;
+ }
+
+ /**
* Indicates whether the record metadata includes the offset.
* @return true if the offset is included in the metadata, false otherwise.
*/
@@ -90,25 +106,6 @@ public final class RecordMetadata {
}
/**
- * The checksum (CRC32) of the record.
- *
- * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
- * computed checksum may not match what was stored on the broker, or what will be returned to the consumer.
- * It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
- * message format v2 does not include a record-level checksum (for performance, the record checksum
- * was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
- * the record timestamp, serialized key size, and serialized value size is returned instead, but
- * this should not be depended on for end-to-end reliability.
- */
- @Deprecated
- public long checksum() {
- if (checksum == null)
- // The checksum is null only for message format v2 and above, which do not have a record-level checksum.
- this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
- return this.checksum;
- }
-
- /**
* The size of the serialized, uncompressed key in bytes. If key is null, the returned size
* is -1.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index a9665a9..0026237 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -32,18 +32,16 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final ProduceRequestResult result;
private final int batchIndex;
private final long createTimestamp;
- private final Long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final Time time;
private volatile FutureRecordMetadata nextRecordMetadata = null;
- public FutureRecordMetadata(ProduceRequestResult result, int batchIndex, long createTimestamp,
- Long checksum, int serializedKeySize, int serializedValueSize, Time time) {
+ public FutureRecordMetadata(ProduceRequestResult result, int batchIndex, long createTimestamp, int serializedKeySize,
+ int serializedValueSize, Time time) {
this.result = result;
this.batchIndex = batchIndex;
this.createTimestamp = createTimestamp;
- this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.time = time;
@@ -101,15 +99,11 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
return value();
}
- Long checksumOrNull() {
- return this.checksum;
- }
-
RecordMetadata value() {
if (nextRecordMetadata != null)
return nextRecordMetadata.value();
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.batchIndex,
- timestamp(), this.checksum, this.serializedKeySize, this.serializedValueSize);
+ timestamp(), this.serializedKeySize, this.serializedValueSize);
}
private long timestamp() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 6f76c79..4da0362 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -106,12 +106,12 @@ public final class ProducerBatch {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
- Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
+ this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
- timestamp, checksum,
+ timestamp,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
@@ -136,7 +136,7 @@ public final class ProducerBatch {
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
- timestamp, thunk.future.checksumOrNull(),
+ timestamp,
key == null ? -1 : key.remaining(),
value == null ? -1 : value.remaining(),
Time.SYSTEM);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 61a8b7a..ceec552 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -114,7 +114,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
}
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
- RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
+ RecordBatch.NO_TIMESTAMP, -1, -1), exception);
}
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index d85f100..bbaed94 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteUtils;
-import org.apache.kafka.common.utils.Checksums;
-import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.PrimitiveRef.IntRef;
import org.apache.kafka.common.utils.Utils;
@@ -33,7 +31,6 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-import java.util.zip.Checksum;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
@@ -632,13 +629,4 @@ public class DefaultRecord implements Record {
int valueSize = value == null ? -1 : value.remaining();
return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
}
-
-
- public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) {
- Checksum checksum = Crc32C.create();
- Checksums.updateLong(checksum, timestamp);
- Checksums.updateInt(checksum, serializedKeySize);
- Checksums.updateInt(checksum, serializedValueSize);
- return checksum.getValue();
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 91edf2c..59ad210 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -396,9 +396,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
}
/**
- * Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
+ * Append a new record at the given offset.
*/
- private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
+ private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
if (isControlRecord != isControlBatch)
@@ -419,9 +419,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
- return null;
} else {
- return appendLegacyRecord(offset, timestamp, key, value, magic);
+ appendLegacyRecord(offset, timestamp, key, value, magic);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
@@ -435,10 +434,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
- return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
+ public void appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
+ appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
}
/**
@@ -448,10 +446,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
- return appendWithOffset(offset, false, timestamp, key, value, headers);
+ public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+ appendWithOffset(offset, false, timestamp, key, value, headers);
}
/**
@@ -460,10 +457,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
- return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+ public void appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
+ appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
}
/**
@@ -472,20 +468,18 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
- return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
+ public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
+ appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
}
/**
* Append a new record at the given offset.
* @param offset The absolute offset of the record in the log buffer
* @param record The record to append
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendWithOffset(long offset, SimpleRecord record) {
- return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
+ public void appendWithOffset(long offset, SimpleRecord record) {
+ appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
}
/**
@@ -494,15 +488,14 @@ public class MemoryRecordsBuilder implements AutoCloseable {
*
* @param offset The absolute offset of the record in the log buffer
* @param record The record to append
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long appendControlRecordWithOffset(long offset, SimpleRecord record) {
+ public void appendControlRecordWithOffset(long offset, SimpleRecord record) {
short typeId = ControlRecordType.parseTypeId(record.key());
ControlRecordType type = ControlRecordType.fromTypeId(typeId);
if (type == ControlRecordType.UNKNOWN)
throw new IllegalArgumentException("Cannot append record with unknown control record type " + typeId);
- return appendWithOffset(offset, true, record.timestamp(),
+ appendWithOffset(offset, true, record.timestamp(),
record.key(), record.value(), record.headers());
}
@@ -511,10 +504,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long append(long timestamp, ByteBuffer key, ByteBuffer value) {
- return append(timestamp, key, value, Record.EMPTY_HEADERS);
+ public void append(long timestamp, ByteBuffer key, ByteBuffer value) {
+ append(timestamp, key, value, Record.EMPTY_HEADERS);
}
/**
@@ -525,8 +517,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param headers The record headers if there are any
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
- return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
+ public void append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+ appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}
/**
@@ -536,8 +528,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param value The record value
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long append(long timestamp, byte[] key, byte[] value) {
- return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+ public void append(long timestamp, byte[] key, byte[] value) {
+ append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
}
/**
@@ -546,19 +538,17 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
- return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
+ public void append(long timestamp, byte[] key, byte[] value, Header[] headers) {
+ append(timestamp, wrapNullable(key), wrapNullable(value), headers);
}
/**
* Append a new record at the next sequential offset.
* @param record The record to append
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- public Long append(SimpleRecord record) {
- return appendWithOffset(nextSequentialOffset(), record);
+ public void append(SimpleRecord record) {
+ appendWithOffset(nextSequentialOffset(), record);
}
/**
@@ -566,36 +556,29 @@ public class MemoryRecordsBuilder implements AutoCloseable {
* @param timestamp The record timestamp
* @param type The control record type (cannot be UNKNOWN)
* @param value The control record value
- * @return CRC of the record or null if record-level CRC is not supported for the message format
*/
- private Long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+ private void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
key.flip();
- return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
+ appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
}
- /**
- * Return CRC of the record or null if record-level CRC is not supported for the message format
- */
- public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
+ public void appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
if (producerId == RecordBatch.NO_PRODUCER_ID)
throw new IllegalArgumentException("End transaction marker requires a valid producerId");
if (!isTransactional)
throw new IllegalArgumentException("End transaction marker depends on batch transactional flag being enabled");
ByteBuffer value = marker.serializeValue();
- return appendControlRecord(timestamp, marker.controlType(), value);
+ appendControlRecord(timestamp, marker.controlType(), value);
}
- /**
- * Return CRC of the record or null if record-level CRC is not supported for the message format
- */
- public Long appendLeaderChangeMessage(long timestamp, LeaderChangeMessage leaderChangeMessage) {
+ public void appendLeaderChangeMessage(long timestamp, LeaderChangeMessage leaderChangeMessage) {
if (partitionLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) {
throw new IllegalArgumentException("Partition leader epoch must be valid, but get " + partitionLeaderEpoch);
}
- return appendControlRecord(timestamp, ControlRecordType.LEADER_CHANGE,
+ appendControlRecord(timestamp, ControlRecordType.LEADER_CHANGE,
MessageUtil.toByteBuffer(leaderChangeMessage, ControlRecordUtils.LEADER_CHANGE_SCHEMA_VERSION));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index 2dc1dde..848f75c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -16,11 +16,12 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -28,8 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class ConsumerRecordTest {
@Test
- @SuppressWarnings("deprecation")
- public void testOldConstructor() {
+ public void testShortConstructor() {
String topic = "topic";
int partition = 0;
long offset = 23;
@@ -44,7 +44,6 @@ public class ConsumerRecordTest {
assertEquals(value, record.value());
assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp());
- assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
assertEquals(Optional.empty(), record.leaderEpoch());
@@ -52,14 +51,63 @@ public class ConsumerRecordTest {
}
@Test
- @SuppressWarnings("deprecation")
- public void testNullChecksumInConstructor() {
+ @Deprecated
+ public void testConstructorsWithChecksum() {
+ String topic = "topic";
+ int partition = 0;
+ long offset = 23;
+ long timestamp = 23434217432432L;
+ TimestampType timestampType = TimestampType.CREATE_TIME;
String key = "key";
String value = "value";
- long timestamp = 242341324L;
- ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 0, 23L, timestamp,
- TimestampType.CREATE_TIME, null, key.length(), value.length(), key, value, new RecordHeaders());
- assertEquals(DefaultRecord.computePartialChecksum(timestamp, key.length(), value.length()), record.checksum());
+ long checksum = 50L;
+ int serializedKeySize = 100;
+ int serializedValueSize = 1142;
+
+ ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+ checksum, serializedKeySize, serializedValueSize, key, value);
+ assertEquals(topic, record.topic());
+ assertEquals(partition, record.partition());
+ assertEquals(offset, record.offset());
+ assertEquals(key, record.key());
+ assertEquals(value, record.value());
+ assertEquals(timestampType, record.timestampType());
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(serializedKeySize, record.serializedKeySize());
+ assertEquals(serializedValueSize, record.serializedValueSize());
+ assertEquals(Optional.empty(), record.leaderEpoch());
+ assertEquals(new RecordHeaders(), record.headers());
+
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("header key", "header value".getBytes(StandardCharsets.UTF_8)));
+ record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+ checksum, serializedKeySize, serializedValueSize, key, value, headers);
+ assertEquals(topic, record.topic());
+ assertEquals(partition, record.partition());
+ assertEquals(offset, record.offset());
+ assertEquals(key, record.key());
+ assertEquals(value, record.value());
+ assertEquals(timestampType, record.timestampType());
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(serializedKeySize, record.serializedKeySize());
+ assertEquals(serializedValueSize, record.serializedValueSize());
+ assertEquals(Optional.empty(), record.leaderEpoch());
+ assertEquals(headers, record.headers());
+
+ Optional<Integer> leaderEpoch = Optional.of(10);
+ record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+ checksum, serializedKeySize, serializedValueSize, key, value, headers, leaderEpoch);
+ assertEquals(topic, record.topic());
+ assertEquals(partition, record.partition());
+ assertEquals(offset, record.offset());
+ assertEquals(key, record.key());
+ assertEquals(value, record.value());
+ assertEquals(timestampType, record.timestampType());
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(serializedKeySize, record.serializedKeySize());
+ assertEquals(serializedValueSize, record.serializedValueSize());
+ assertEquals(leaderEpoch, record.leaderEpoch());
+ assertEquals(headers, record.headers());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index e34b6b1..d414450 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -24,7 +24,10 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
@@ -37,10 +40,12 @@ public class ConsumerRecordsTest {
String topic = "topic";
records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
- ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
- ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
+ ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME,
+ 0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+ ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME,
+ 0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
- records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());
+ records.put(new TopicPartition(topic, 2), new ArrayList<>());
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 9a74b06..be2ea0e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
@@ -47,8 +48,10 @@ public class MockConsumerTest {
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
- ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
- ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+ ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+ ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(1));
@@ -74,8 +77,10 @@ public class MockConsumerTest {
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
- ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
- ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+ ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+ ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME,
+ 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords<String, String> recs = consumer.poll(1);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 1fad0c5..19ac256 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
@@ -30,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,8 +45,8 @@ public class ConsumerInterceptorsTest {
private final TopicPartition tp = new TopicPartition(topic, partition);
private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
- private final ConsumerRecord<Integer, Integer> consumerRecord =
- new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1);
+ private final ConsumerRecord<Integer, Integer> consumerRecord = new ConsumerRecord<>(topic, partition, 0, 0L,
+ TimestampType.CREATE_TIME, 0, 0, 1, 1, new RecordHeaders(), Optional.empty());
private int onCommitCount = 0;
private int onConsumeCount = 0;
@@ -117,11 +119,11 @@ public class ConsumerInterceptorsTest {
List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
list1.add(consumerRecord);
List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
- list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0,
- 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
+ list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L,
+ TimestampType.CREATE_TIME, 0, 0, 1, 1, new RecordHeaders(), Optional.empty()));
List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
- list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0,
- 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
+ list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME,
+ 0, 0, 1, 1, new RecordHeaders(), Optional.empty()));
records.put(tp, list1);
records.put(filterTopicPart1, list2);
records.put(filterTopicPart2, list3);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
index 51764dc..b6207fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.DefaultRecord;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -26,56 +25,66 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
public class RecordMetadataTest {
@Test
- @SuppressWarnings("deprecation")
- public void testConstructionWithMissingRelativeOffset() {
+ public void testConstructionWithMissingBatchIndex() {
TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
- Long checksum = 908923L;
- RecordMetadata metadata = new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
+ RecordMetadata metadata = new RecordMetadata(tp, -1L, -1, timestamp, keySize, valueSize);
assertEquals(tp.topic(), metadata.topic());
assertEquals(tp.partition(), metadata.partition());
assertEquals(timestamp, metadata.timestamp());
assertFalse(metadata.hasOffset());
assertEquals(-1L, metadata.offset());
- assertEquals(checksum.longValue(), metadata.checksum());
assertEquals(keySize, metadata.serializedKeySize());
assertEquals(valueSize, metadata.serializedValueSize());
}
@Test
- @SuppressWarnings("deprecation")
- public void testConstructionWithRelativeOffset() {
+ public void testConstructionWithBatchIndexOffset() {
TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
long baseOffset = 15L;
- long relativeOffset = 3L;
- Long checksum = 908923L;
+ int batchIndex = 3;
- RecordMetadata metadata = new RecordMetadata(tp, baseOffset, relativeOffset, timestamp, checksum,
- keySize, valueSize);
+ RecordMetadata metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, keySize, valueSize);
assertEquals(tp.topic(), metadata.topic());
assertEquals(tp.partition(), metadata.partition());
assertEquals(timestamp, metadata.timestamp());
- assertEquals(baseOffset + relativeOffset, metadata.offset());
- assertEquals(checksum.longValue(), metadata.checksum());
+ assertEquals(baseOffset + batchIndex, metadata.offset());
assertEquals(keySize, metadata.serializedKeySize());
assertEquals(valueSize, metadata.serializedValueSize());
}
@Test
- @SuppressWarnings("deprecation")
- public void testNullChecksum() {
+ @Deprecated
+ public void testConstructionWithChecksum() {
+ TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
+ long baseOffset = 15L;
+ long batchIndex = 3L;
int keySize = 3;
int valueSize = 5;
- RecordMetadata metadata = new RecordMetadata(new TopicPartition("foo", 0), 15L, 3L, timestamp, null,
- keySize, valueSize);
- assertEquals(DefaultRecord.computePartialChecksum(timestamp, keySize, valueSize), metadata.checksum());
+
+ RecordMetadata metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, null, keySize, valueSize);
+ assertEquals(tp.topic(), metadata.topic());
+ assertEquals(tp.partition(), metadata.partition());
+ assertEquals(timestamp, metadata.timestamp());
+ assertEquals(baseOffset + batchIndex, metadata.offset());
+ assertEquals(keySize, metadata.serializedKeySize());
+ assertEquals(valueSize, metadata.serializedValueSize());
+
+ long checksum = 133424L;
+ metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, checksum, keySize, valueSize);
+ assertEquals(tp.topic(), metadata.topic());
+ assertEquals(tp.partition(), metadata.partition());
+ assertEquals(timestamp, metadata.timestamp());
+ assertEquals(baseOffset + batchIndex, metadata.offset());
+ assertEquals(keySize, metadata.serializedKeySize());
+ assertEquals(valueSize, metadata.serializedValueSize());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index b7cfd4e..86632ee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -48,7 +48,7 @@ public class RecordSendTest {
public void testTimeout() throws Exception {
ProduceRequestResult request = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
- RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+ RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
assertFalse(future.isDone(), "Request is not completed");
try {
future.get(5, TimeUnit.MILLISECONDS);
@@ -68,7 +68,7 @@ public class RecordSendTest {
@Test
public void testError() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
- relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+ relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
assertThrows(ExecutionException.class, future::get);
}
@@ -78,7 +78,7 @@ public class RecordSendTest {
@Test
public void testBlocking() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
- relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+ relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
assertEquals(baseOffset + relOffset, future.get().offset());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
index 74649c8..1fd7a43 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
@@ -67,7 +67,6 @@ public class FutureRecordMetadataTest {
produceRequestResult,
0,
RecordBatch.NO_TIMESTAMP,
- 0L,
0,
0,
time
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index b6ee582..6af8289 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
@@ -61,14 +60,6 @@ public class ProducerBatchTest {
CompressionType.NONE, TimestampType.CREATE_TIME, 128);
@Test
- public void testChecksumNullForMagicV2() {
- ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
- FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
- assertNotNull(future);
- assertNull(future.checksumOrNull());
- }
-
- @Test
public void testBatchAbort() throws Exception {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
MockCallback callback = new MockCallback();
@@ -139,23 +130,6 @@ public class ProducerBatchTest {
}
@Test
- public void testAppendedChecksumMagicV0AndV1() {
- for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
- MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
- CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
- ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
- byte[] key = "hi".getBytes();
- byte[] value = "there".getBytes();
-
- FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
- assertNotNull(future);
- byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
- long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
- assertEquals(expectedChecksum, future.checksumOrNull().longValue());
- }
- }
-
- @Test
public void testSplitPreservesHeaders() {
for (CompressionType compressionType : CompressionType.values()) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 3cc63a9..cd15a3e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -144,7 +144,7 @@ public class ProducerInterceptorsTest {
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
// verify onAck is called on all interceptors
- RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0);
+ RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
interceptors.onAcknowledgement(meta, null);
assertEquals(2, onAckCount);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 13eaa9d..5dedf66 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -437,11 +437,10 @@ public class MemoryRecordsBuilderTest {
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
- Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes());
+ builder.append(1L, "key".getBytes(), "value".getBytes());
MemoryRecords memoryRecords = builder.build();
List<Record> records = TestUtils.toList(memoryRecords.records());
assertEquals(1, records.size());
- assertEquals(checksumOrNull, records.get(0).checksumOrNull());
}
@ParameterizedTest
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 3fcc157..b01584b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -26,12 +26,14 @@ import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,7 +58,6 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
}
@Override
- @SuppressWarnings("deprecation")
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// This will ensure that we get the cluster metadata when onConsume is called for the first time
@@ -69,13 +70,14 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
for (ConsumerRecord<String, String> record: records.records(tp)) {
lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
record.timestamp(), record.timestampType(),
- record.checksum(), record.serializedKeySize(),
+ record.serializedKeySize(),
record.serializedValueSize(),
- record.key(), record.value().toUpperCase(Locale.ROOT)));
+ record.key(), record.value().toUpperCase(Locale.ROOT),
+ new RecordHeaders(), Optional.empty()));
}
recordMap.put(tp, lst);
}
- return new ConsumerRecords<String, String>(recordMap);
+ return new ConsumerRecords<>(recordMap);
}
@Override
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 0b47cc9..9cf09f8 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,7 +50,7 @@ public class MirrorSourceTaskTest {
headers.add("header1", new byte[]{'l', 'm', 'n', 'o'});
headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
- TimestampType.CREATE_TIME, 0L, 5, 6, key, value, headers);
+ TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
new DefaultReplicationPolicy(), 50);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
@@ -111,9 +112,9 @@ public class MirrorSourceTaskTest {
new RecordHeader(headerKey, "value".getBytes()),
});
consumerRecordsList.add(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
- TimestampType.CREATE_TIME, 0L, key1.length, value1.length, key1, value1, headers));
+ TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
consumerRecordsList.add(new ConsumerRecord<>(topicName, 1, 1, System.currentTimeMillis(),
- TimestampType.CREATE_TIME, 0L, key2.length, value2.length, key2, value2, headers));
+ TimestampType.CREATE_TIME, key2.length, value2.length, key2, value2, headers, Optional.empty()));
ConsumerRecords<byte[], byte[]> consumerRecords =
new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(topicName, 0), consumerRecordsList));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
index b709e28..aa1d0be 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -95,8 +96,8 @@ public class WorkerErrantRecordReporter implements ErrantRecordReporter {
int valLength = value != null ? value.length : -1;
consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
- record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
- valLength, key, value, headers);
+ record.kafkaOffset(), record.timestamp(), record.timestampType(), keyLength,
+ valLength, key, value, headers, Optional.empty());
}
Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index a7d68a6..b7492d3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -75,6 +75,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -1111,8 +1112,10 @@ public class WorkerSinkTaskTest {
long timestamp = RecordBatch.NO_TIMESTAMP;
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
- records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET + recordsReturnedTp3 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, timestamp, timestampType,
+ 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET + recordsReturnedTp3 + 1, timestamp, timestampType,
+ 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
recordsReturnedTp1 += 1;
recordsReturnedTp3 += 1;
return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records));
@@ -1495,9 +1498,9 @@ public class WorkerSinkTaskTest {
expectConsumerPoll(Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
- 0L, 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA),
+ 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()),
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
- 0L, 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB)
+ 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty())
));
expectTransformation(2, null);
@@ -1614,7 +1617,8 @@ public class WorkerSinkTaskTest {
() -> {
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (int i = 0; i < numMessages; i++)
- records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE, headers));
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType,
+ 0, 0, RAW_KEY, RAW_VALUE, headers, Optional.empty()));
recordsReturnedTp1 += numMessages;
return new ConsumerRecords<>(
numMessages > 0 ?
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 336f24b..7b4474b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -555,9 +556,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
- Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
- )));
+ Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+ 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()))));
recordsReturned++;
return records;
});
@@ -586,9 +586,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
- Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
- )));
+ Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+ 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()))));
recordsReturned++;
return records;
});
@@ -617,8 +616,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
- Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+ Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+ 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())
)));
recordsReturned++;
return records;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index aefc3a9..4d358ca 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -1435,7 +1435,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
if (sendSuccess) {
cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
- 0L, 0L, 0, 0), null);
+ 0L, 0, 0), null);
} else {
cb.onCompletion(null, new TopicAuthorizationException("foo"));
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
index 3e5e082..89f1013 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
@@ -48,7 +48,7 @@ public class ProcessingContextTest {
Future<Void> result = context.report();
fs.forEach(f -> {
assertFalse(result.isDone());
- f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0, 0L, 0, 0));
+ f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0, 0, 0));
});
assertTrue(result.isDone());
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 4504d39..d3960e5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
@@ -55,6 +56,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -464,11 +466,16 @@ public class KafkaConfigBackingStoreTest {
public void testRestoreTargetState() throws Exception {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -506,10 +513,14 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -549,10 +560,14 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -607,11 +622,16 @@ public class KafkaConfigBackingStoreTest {
public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -651,14 +671,21 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
- new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -707,12 +734,18 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
@@ -752,15 +785,23 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
// Overwrite each type at least once to ensure we see the latest data after loading
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
// Connector after root update should make it through, task update shouldn't
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
- new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
- new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -807,12 +848,16 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
// This is the record that has been compacted:
//new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
- new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
- new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
- new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -966,8 +1011,11 @@ public class KafkaConfigBackingStoreTest {
EasyMock.expect(storeLog.readToEnd())
.andAnswer(() -> {
TestFuture<Void> future = new TestFuture<>();
- for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
+ for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) {
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0,
+ entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty()));
+ }
future.resolveOnGet((Void) null);
return future;
});
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index d216068..07eadc8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
@@ -46,6 +47,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -149,10 +151,14 @@ public class KafkaOffsetBackingStoreTest {
public void testReloadOnStart() throws Exception {
expectConfigure();
expectStart(Arrays.asList(
- new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
- new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
- new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
- new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+ new RecordHeaders(), Optional.empty())
));
expectStop();
expectClusterId();
@@ -196,8 +202,12 @@ public class KafkaOffsetBackingStoreTest {
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+ new RecordHeaders(), Optional.empty()));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+ new RecordHeaders(), Optional.empty()));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
@@ -206,8 +216,12 @@ public class KafkaOffsetBackingStoreTest {
final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+ new RecordHeaders(), Optional.empty()));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+ new RecordHeaders(), Optional.empty()));
thirdGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
@@ -271,8 +285,12 @@ public class KafkaOffsetBackingStoreTest {
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, null, TP0_VALUE.array()));
- capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), null));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, null, TP0_VALUE.array(),
+ new RecordHeaders(), Optional.empty()));
+ capturedConsumedCallback.getValue().onCompletion(null,
+ new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), null,
+ new RecordHeaders(), Optional.empty()));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index 4c0c7f9..f19be75 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Schema;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
@@ -436,7 +438,7 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
- TimestampType.CREATE_TIME, 0L, 0, 0, key, value);
+ TimestampType.CREATE_TIME, 0, 0, key, value, new RecordHeaders(), Optional.empty());
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index e36f2a9..8fae57e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
@@ -56,6 +57,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -187,12 +189,14 @@ public class KafkaBasedLogTest {
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(() ->
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE))
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+ new RecordHeaders(), Optional.empty()))
);
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(() ->
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE))
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE,
+ new RecordHeaders(), Optional.empty()))
);
consumer.schedulePollTask(finishedLatch::countDown);
});
@@ -306,13 +310,17 @@ public class KafkaBasedLogTest {
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(() -> {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+ new RecordHeaders(), Optional.empty()));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+ new RecordHeaders(), Optional.empty()));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE,
+ new RecordHeaders(), Optional.empty()));
});
consumer.schedulePollTask(() ->
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE_NEW)));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE_NEW,
+ new RecordHeaders(), Optional.empty())));
// Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch
});
@@ -357,8 +365,10 @@ public class KafkaBasedLogTest {
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(() -> {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+ new RecordHeaders(), Optional.empty()));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+ new RecordHeaders(), Optional.empty()));
});
consumer.schedulePollTask(finishedLatch::countDown);
@@ -411,8 +421,10 @@ public class KafkaBasedLogTest {
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(() -> {
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
- consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+ new RecordHeaders(), Optional.empty()));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+ new RecordHeaders(), Optional.empty()));
});
consumer.schedulePollTask(finishedLatch::countDown);
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index ce58214..1269ff5 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
-import java.util.{Collections, Locale, Map, Properties, Random}
+import java.util.{Collections, Locale, Map, Optional, Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.utils.Implicits._
@@ -35,7 +35,6 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -112,8 +111,8 @@ object ConsoleConsumer extends Logging {
}
messageCount += 1
try {
- formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
- msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
+ formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
+ 0, 0, msg.key, msg.value, msg.headers, Optional.empty[Integer]), output)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
@@ -614,18 +613,3 @@ class NoOpMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
}
-class ChecksumMessageFormatter extends MessageFormatter {
- private var topicStr: String = _
-
- override def configure(configs: Map[String, _]): Unit = {
- topicStr = if (configs.containsKey("topic"))
- configs.get("topic").toString + ":"
- else
- ""
- }
-
- @nowarn("cat=deprecation")
- def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
- output.println(topicStr + "checksum:" + consumerRecord.checksum)
- }
-}
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 70188e4..1f7e9d1 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -35,7 +35,6 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
@@ -102,7 +101,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
*/
- @nowarn("cat=deprecation")
@Test
def testSendOffset(): Unit = {
val producer = createProducer(brokerList)
@@ -123,7 +121,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes(StandardCharsets.UTF_8).length)
case _ => assertTrue(metadata.serializedValueSize > 0)
}
- assertNotEquals(metadata.checksum(), 0)
offset += 1
} else {
fail(s"Send callback returns the following exception: $exception")
diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
index ea72f4f..f2ca243 100644
--- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
+++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import java.util.Optional
import scala.jdk.CollectionConverters._
class DefaultMessageFormatterTest {
@@ -210,12 +211,12 @@ object DefaultMessageFormatterTest {
offset,
timestamp,
timestampType,
- 0L,
0,
0,
if (key == null) null else key.getBytes(StandardCharsets.UTF_8),
if (value == null) null else value.getBytes(StandardCharsets.UTF_8),
- new RecordHeaders(headers.asJava))
+ new RecordHeaders(headers.asJava),
+ Optional.empty[Integer])
}
private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 17a7459..91ccbcc 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -19,8 +19,7 @@ package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files
-import java.util.{HashMap, Map => JMap}
-
+import java.util.{HashMap, Optional, Map => JMap}
import kafka.tools.ConsoleConsumer.ConsumerWrapper
import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
@@ -31,6 +30,7 @@ import org.apache.kafka.test.MockDeserializer
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers
import ArgumentMatchers._
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -498,7 +498,8 @@ class ConsoleConsumerTest {
assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
out = new ByteArrayOutputStream()
- val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
+ val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes, "value".getBytes,
+ new RecordHeaders(), Optional.empty[Integer])
formatter.writeTo(record2, new PrintStream(out))
assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
formatter.close()
@@ -515,22 +516,4 @@ class ConsoleConsumerTest {
assertEquals("", out.toString)
}
- @Test
- def testChecksumMessageFormatter(): Unit = {
- val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
- val formatter = new ChecksumMessageFormatter()
- val configs: JMap[String, String] = new HashMap()
-
- formatter.configure(configs)
- var out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("checksum:-1\n", out.toString)
-
- configs.put("topic", "topic1")
- formatter.configure(configs)
- out = new ByteArrayOutputStream()
- formatter.writeTo(record, new PrintStream(out))
- assertEquals("topic1:checksum:-1\n", out.toString)
- }
-
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dd83908..6a632e0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -27,6 +27,8 @@
or updating the application not to use internal classes.</li>
<li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
+ <li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
+ Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>core</code> and <code>tools</code> modules:</li>
<ul>
<li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
@@ -53,11 +55,15 @@
instead.</li>
<li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
<li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
- <li>The deprecated <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
+ <li>The <code>checksum()</code> method has been removed from <code>ConsumerRecord</code> and <code>RecordMetadata</code>. The message
+ format v2, which has been the default since 0.11, moved the checksum from the record to the record batch. As such, these methods
+ don't make sense and no replacements exist.</li>
+ <li>The <code>ChecksumMessageFormatter</code> class was removed. It is not part of the public API, but it may have been used
+ with <code>kafka-console-consumer.sh</code>. It reported the checksum of each record, which has not been supported
+ since message format v2.</li>
+ <li>The <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> instead.</li>
- <li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
- Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
- <li>The deprecated <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
+ <li>The <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
Dynamic quota defaults must be used instead.</li>
</ul>
</ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 269dff2..20f1449 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
+import java.util.Optional;
+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
class RecordDeserializer {
@@ -59,12 +61,12 @@ class RecordDeserializer {
rawRecord.offset(),
rawRecord.timestamp(),
TimestampType.CREATE_TIME,
- rawRecord.checksum(),
rawRecord.serializedKeySize(),
rawRecord.serializedValueSize(),
sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()),
- rawRecord.headers()
+ rawRecord.headers(),
+ Optional.empty()
);
} catch (final Exception deserializationException) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 8305d52..1f2e593 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -38,7 +38,6 @@ public final class RecordConverters {
record.offset(),
timestamp,
record.timestampType(),
- record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
index 93ea448..5474f9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
@@ -17,9 +17,12 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -44,9 +47,10 @@ public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest {
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
- 0,
null,
- null),
+ null,
+ new RecordHeaders(),
+ Optional.empty()),
0
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
index bd235be..dca9a70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
@@ -17,8 +17,11 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -36,9 +39,10 @@ class TimestampExtractorTest {
TimestampType.NO_TIMESTAMP_TYPE,
0,
0,
- 0,
null,
- null),
+ null,
+ new RecordHeaders(),
+ Optional.empty()),
0
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 3cc06be..e4bc600 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -40,6 +41,7 @@ import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static java.util.Arrays.asList;
@@ -148,7 +150,7 @@ public class GlobalStateTaskTest {
final boolean failExpected) {
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
topic2, 1, 1, 0L, TimestampType.CREATE_TIME,
- 0L, 0, 0, key, recordValue
+ 0, 0, key, recordValue, new RecordHeaders(), Optional.empty()
);
globalStateTask.initialize();
try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 4ae9d32..18d17ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.junit.Test;
+import java.util.Optional;
+
import static org.junit.Assert.assertEquals;
public class RecordDeserializerTest {
@@ -36,14 +38,13 @@ public class RecordDeserializerTest {
1,
10,
TimestampType.LOG_APPEND_TIME,
- 5L,
3,
5,
new byte[0],
new byte[0],
- headers);
+ headers,
+ Optional.empty());
- @SuppressWarnings("deprecation")
@Test
public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
final RecordDeserializer recordDeserializer = new RecordDeserializer(
@@ -60,7 +61,6 @@ public class RecordDeserializerTest {
assertEquals(rawRecord.topic(), record.topic());
assertEquals(rawRecord.partition(), record.partition());
assertEquals(rawRecord.offset(), record.offset());
- assertEquals(rawRecord.checksum(), record.checksum());
assertEquals("key", record.key());
assertEquals("value", record.value());
assertEquals(rawRecord.timestamp(), record.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 895e045..142e85a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -45,6 +46,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -103,9 +105,9 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 2, 1, 3
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
queue.addRawRecords(list1);
@@ -128,9 +130,9 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
queue.addRawRecords(list2);
@@ -161,9 +163,9 @@ public class RecordQueueTest {
// add three more records with 4, 5, 6
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
queue.addRawRecords(list3);
@@ -202,10 +204,14 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 2, 1, 3, 4
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()));
queue.addRawRecords(list1);
assertThat(queue.partitionTime(), is(RecordQueue.UNKNOWN));
@@ -231,10 +237,14 @@ public class RecordQueueTest {
assertThat(queue.partitionTime(), is(150L));
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 200, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 100, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 300, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 400, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 200, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 100, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 300, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 400, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()));
queue.addRawRecords(list1);
assertThat(queue.partitionTime(), is(150L));
@@ -253,7 +263,8 @@ public class RecordQueueTest {
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
+ new RecordHeaders(), Optional.empty()));
final StreamsException exception = assertThrows(
StreamsException.class,
@@ -266,7 +277,8 @@ public class RecordQueueTest {
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
+ new RecordHeaders(), Optional.empty()));
final StreamsException exception = assertThrows(
StreamsException.class,
@@ -279,7 +291,8 @@ public class RecordQueueTest {
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
+ new RecordHeaders(), Optional.empty()));
queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(0, queueThatSkipsDeserializeErrors.size());
@@ -289,7 +302,8 @@ public class RecordQueueTest {
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
+ new RecordHeaders(), Optional.empty()));
queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(0, queueThatSkipsDeserializeErrors.size());
@@ -298,7 +312,8 @@ public class RecordQueueTest {
@Test
public void shouldThrowOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()));
final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
@@ -323,7 +338,8 @@ public class RecordQueueTest {
@Test
public void shouldDropOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
- new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()));
final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
@@ -355,10 +371,14 @@ public class RecordQueueTest {
// add three 3 out-of-order records with timestamp 2, 1, 3, 4
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()),
+ new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+ new RecordHeaders(), Optional.empty()));
assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3a6a7c5..fa80e46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -81,6 +82,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -2539,11 +2541,12 @@ public class StreamTaskTest {
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
- 0L,
0,
0,
recordKey,
- intSerializer.serialize(null, value)
+ intSerializer.serialize(null, value),
+ new RecordHeaders(),
+ Optional.empty()
);
}
@@ -2555,11 +2558,12 @@ public class StreamTaskTest {
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
- 0L,
0,
0,
recordKey,
- recordValue
+ recordValue,
+ new RecordHeaders(),
+ Optional.empty()
);
}
@@ -2570,11 +2574,12 @@ public class StreamTaskTest {
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
- 0L,
0,
0,
new IntegerSerializer().serialize(topic1, key),
- recordValue
+ recordValue,
+ new RecordHeaders(),
+ Optional.empty()
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 82b1996..80fb2c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -1853,11 +1854,12 @@ public class StreamThreadTest {
100L,
100L,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
"K".getBytes().length,
"V".getBytes().length,
"K".getBytes(),
- "V".getBytes()));
+ "V".getBytes(),
+ new RecordHeaders(),
+ Optional.empty()));
thread.runOnce();
@@ -1940,11 +1942,12 @@ public class StreamThreadTest {
110L,
110L,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
"K".getBytes().length,
"V".getBytes().length,
"K".getBytes(),
- "V".getBytes()));
+ "V".getBytes(),
+ new RecordHeaders(),
+ Optional.empty()));
thread.runOnce();
@@ -2148,22 +2151,24 @@ public class StreamThreadTest {
++offset,
-1,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
-1,
-1,
new byte[0],
- "I am not an integer.".getBytes()));
+ "I am not an integer.".getBytes(),
+ new RecordHeaders(),
+ Optional.empty()));
mockConsumer.addRecord(new ConsumerRecord<>(
t1p1.topic(),
t1p1.partition(),
++offset,
-1,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
-1,
-1,
new byte[0],
- "I am not an integer.".getBytes()));
+ "I am not an integer.".getBytes(),
+ new RecordHeaders(),
+ Optional.empty()));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RecordDeserializer.class)) {
thread.runOnce();
@@ -2860,11 +2865,12 @@ public class StreamThreadTest {
offset,
timestamp,
TimestampType.CREATE_TIME,
- ConsumerRecord.NULL_CHECKSUM,
-1,
-1,
new byte[0],
- new byte[0]));
+ new byte[0],
+ new RecordHeaders(),
+ Optional.empty()));
}
StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
index a702fdc..4a6fbbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
@@ -17,8 +17,11 @@
package org.apache.kafka.streams.processor.internals.testutil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
+import java.util.Optional;
+
public final class ConsumerRecordUtil {
private ConsumerRecordUtil() {}
@@ -36,11 +39,12 @@ public final class ConsumerRecordUtil {
offset,
0L,
TimestampType.CREATE_TIME,
- 0L,
0,
0,
key,
- value
+ value,
+ new RecordHeaders(),
+ Optional.empty()
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index bacbacd..e409ca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -17,10 +17,12 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Optional;
import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.junit.Assert.assertArrayEquals;
@@ -41,7 +43,8 @@ public class RecordConvertersTest {
final long timestamp = 10L;
final byte[] value = new byte[1];
final ConsumerRecord<byte[], byte[]> inputRecord = new ConsumerRecord<>(
- "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], value);
+ "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0, 0, new byte[0], value,
+ new RecordHeaders(), Optional.empty());
final byte[] expectedValue = ByteBuffer.allocate(9).putLong(timestamp).put(value).array();
final byte[] actualValue = timestampedValueConverter.convert(inputRecord).value();
assertArrayEquals(expectedValue, actualValue);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 0037fbb..7d420c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -47,6 +47,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -387,9 +388,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
TimestampType.CREATE_TIME,
-1,
-1,
- -1,
"todelete".getBytes(UTF_8),
- hexStringToByteArray(toDeleteBinaryValue)),
+ hexStringToByteArray(toDeleteBinaryValue),
+ new RecordHeaders(),
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
1,
@@ -397,9 +399,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
TimestampType.CREATE_TIME,
-1,
-1,
- -1,
"asdf".getBytes(UTF_8),
- hexStringToByteArray(asdfBinaryValue)),
+ hexStringToByteArray(asdfBinaryValue),
+ new RecordHeaders(),
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
@@ -407,9 +410,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
TimestampType.CREATE_TIME,
-1,
-1,
- -1,
"zxcv".getBytes(UTF_8),
- hexStringToByteArray(zxcvBinaryValue1)),
+ hexStringToByteArray(zxcvBinaryValue1),
+ new RecordHeaders(),
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
3,
@@ -417,9 +421,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
TimestampType.CREATE_TIME,
-1,
-1,
- -1,
"zxcv".getBytes(UTF_8),
- hexStringToByteArray(zxcvBinaryValue2))
+ hexStringToByteArray(zxcvBinaryValue2),
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(3));
@@ -434,9 +439,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
TimestampType.CREATE_TIME,
-1,
-1,
- -1,
"todelete".getBytes(UTF_8),
- null)
+ null,
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(2));
@@ -501,45 +507,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
0,
999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
- v1FlagHeaders),
+ v1FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
- v1FlagHeaders),
+ v1FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
- v1FlagHeaders),
+ v1FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
3,
100,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
- v1FlagHeaders)
+ v1FlagHeaders,
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(3));
@@ -552,11 +558,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
3,
3,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
- null)
+ null,
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(2));
@@ -622,45 +629,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
0,
999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
- v2FlagHeaders),
+ v2FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
- v2FlagHeaders),
+ v2FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
- v2FlagHeaders),
+ v2FlagHeaders,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
100,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
- v2FlagHeaders)
+ v2FlagHeaders,
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(3));
@@ -673,11 +680,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
3,
3,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
- null)
+ null,
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(2));
@@ -745,45 +753,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
0,
999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
100,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
- headers)
+ headers,
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(3));
@@ -796,11 +804,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
3,
3,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
- null)
+ null,
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(2));
@@ -865,45 +874,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
0,
999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
- headers),
+ headers,
+ Optional.empty()),
new ConsumerRecord<>("changelog-topic",
0,
2,
100,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
- headers)
+ headers,
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(3));
@@ -916,11 +925,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
3,
3,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
- null)
+ null,
+ new RecordHeaders(),
+ Optional.empty())
));
assertThat(buffer.numRecords(), is(2));
@@ -979,12 +989,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
0,
999,
TimestampType.CREATE_TIME,
- -1L,
-1,
-1,
"todelete".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
- unknownFlagHeaders)
+ unknownFlagHeaders,
+ Optional.empty())
));
fail("expected an exception");
} catch (final IllegalArgumentException expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 244b35f..2398872 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
private final Serializer<K> keySerializer;
@@ -60,10 +61,10 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
public void bufferRecord(final ConsumerRecord<K, V> record) {
recordBuffer.add(
new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(),
- record.timestampType(), 0L, 0, 0,
+ record.timestampType(), 0, 0,
keySerializer.serialize(record.topic(), record.headers(), record.key()),
valueSerializer.serialize(record.topic(), record.headers(), record.value()),
- record.headers()));
+ record.headers(), Optional.empty()));
endOffset = record.offset();
super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f657209..bd93367 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -101,6 +101,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
@@ -606,12 +607,12 @@ public class TopologyTestDriver implements Closeable {
offset,
timestamp,
TimestampType.CREATE_TIME,
- (long) ConsumerRecord.NULL_CHECKSUM,
key == null ? ConsumerRecord.NULL_SIZE : key.length,
value == null ? ConsumerRecord.NULL_SIZE : value.length,
key,
value,
- headers))
+ headers,
+ Optional.empty()))
);
}
@@ -663,12 +664,12 @@ public class TopologyTestDriver implements Closeable {
offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1,
timestamp,
TimestampType.CREATE_TIME,
- (long) ConsumerRecord.NULL_CHECKSUM,
key == null ? ConsumerRecord.NULL_SIZE : key.length,
value == null ? ConsumerRecord.NULL_SIZE : value.length,
key,
value,
- headers)
+ headers,
+ Optional.empty())
);
globalStateTask.flushState();
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index ae71de8..15164f6 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
/**
* Factory to create {@link ConsumerRecord consumer records} for a single single-partitioned topic with given key and
@@ -190,12 +191,12 @@ public class ConsumerRecordFactory<K, V> {
-1L,
timestampMs,
TimestampType.CREATE_TIME,
- (long) ConsumerRecord.NULL_CHECKSUM,
serializedKey == null ? 0 : serializedKey.length,
serializedValue == null ? 0 : serializedValue.length,
serializedKey,
serializedValue,
- headers);
+ headers,
+ Optional.empty());
}
/**
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
index b16ea03..ad3b1a2 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -152,8 +153,8 @@ public class TestRecordTest {
@Test
public void testConsumerRecord() {
final String topicName = "topic";
- final ConsumerRecord<String, Integer> consumerRecord =
- new ConsumerRecord<>(topicName, 1, 0, recordMs, TimestampType.CREATE_TIME, 0L, 0, 0, key, value, headers);
+ final ConsumerRecord<String, Integer> consumerRecord = new ConsumerRecord<>(topicName, 1, 0, recordMs,
+ TimestampType.CREATE_TIME, 0, 0, key, value, headers, Optional.empty());
final TestRecord<String, Integer> testRecord = new TestRecord<>(consumerRecord);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
assertEquals(expectedRecord, testRecord);