You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/04/14 21:40:55 UTC

[kafka] branch trunk updated: KAFKA-12612: Remove `checksum` from ConsumerRecord/RecordMetadata for 3.0 (#10470)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 89933f2  KAFKA-12612: Remove `checksum` from ConsumerRecord/RecordMetadata for 3.0 (#10470)
89933f2 is described below

commit 89933f21f204abf75336464d3ac24a4fdd254628
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Apr 14 14:38:37 2021 -0700

    KAFKA-12612: Remove `checksum` from ConsumerRecord/RecordMetadata for 3.0 (#10470)
    
    The methods have been deprecated since 0.11 without replacement since
    message format 2 moved the checksum to the record batch (instead of the
    record).
    
    Unfortunately, we did not deprecate the constructors that take a checksum
    (even though we intended to) so we cannot remove them. I have deprecated
    them for removal in 4.0 and added a single non deprecated constructor to
    `ConsumerRecord` and `RecordMetadata` that take all remaining parameters.
    `ConsumerRecord` could do with one additional convenience constructor, but
    that requires a KIP and hence should be done separately.
    
    Also:
    * Removed `ChecksumMessageFormatter`, which is technically not public
    API, but may have been used with the console consumer.
    * Updated all usages of `ConsumerRecord`/`RecordMetadata` constructors
    to use the non deprecated ones.
    * Added tests for deprecated `ConsumerRecord/`RecordMetadata`
    constructors.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, David Jacot <dj...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerRecord.java     | 115 ++++++++++-------
 .../kafka/clients/consumer/internals/Fetcher.java  |   2 +-
 .../kafka/clients/producer/KafkaProducer.java      |   2 +-
 .../kafka/clients/producer/MockProducer.java       |  10 +-
 .../kafka/clients/producer/RecordMetadata.java     |  53 ++++----
 .../producer/internals/FutureRecordMetadata.java   |  12 +-
 .../clients/producer/internals/ProducerBatch.java  |   6 +-
 .../producer/internals/ProducerInterceptors.java   |   2 +-
 .../apache/kafka/common/record/DefaultRecord.java  |  12 --
 .../kafka/common/record/MemoryRecordsBuilder.java  |  79 +++++-------
 .../kafka/clients/consumer/ConsumerRecordTest.java |  68 ++++++++--
 .../clients/consumer/ConsumerRecordsTest.java      |  11 +-
 .../kafka/clients/consumer/MockConsumerTest.java   |  13 +-
 .../internals/ConsumerInterceptorsTest.java        |  14 ++-
 .../kafka/clients/producer/RecordMetadataTest.java |  47 ++++---
 .../kafka/clients/producer/RecordSendTest.java     |   6 +-
 .../internals/FutureRecordMetadataTest.java        |   1 -
 .../producer/internals/ProducerBatchTest.java      |  26 ----
 .../internals/ProducerInterceptorsTest.java        |   2 +-
 .../common/record/MemoryRecordsBuilderTest.java    |   3 +-
 .../apache/kafka/test/MockConsumerInterceptor.java |  10 +-
 .../kafka/connect/mirror/MirrorSourceTaskTest.java |   7 +-
 .../runtime/errors/WorkerErrantRecordReporter.java |   5 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |  14 ++-
 .../runtime/WorkerSinkTaskThreadedTest.java        |  15 ++-
 .../connect/runtime/WorkerSourceTaskTest.java      |   2 +-
 .../runtime/errors/ProcessingContextTest.java      |   2 +-
 .../storage/KafkaConfigBackingStoreTest.java       | 138 ++++++++++++++-------
 .../storage/KafkaOffsetBackingStoreTest.java       |  38 ++++--
 .../storage/KafkaStatusBackingStoreTest.java       |   4 +-
 .../kafka/connect/util/KafkaBasedLogTest.java      |  32 +++--
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  22 +---
 .../kafka/api/BaseProducerSendTest.scala           |   3 -
 .../kafka/tools/DefaultMessageFormatterTest.scala  |   5 +-
 .../unit/kafka/tools/ConsoleConsumerTest.scala     |  25 +---
 docs/upgrade.html                                  |  14 ++-
 .../processor/internals/RecordDeserializer.java    |   6 +-
 .../streams/state/internals/RecordConverters.java  |   1 -
 .../LogAndSkipOnInvalidTimestampTest.java          |   8 +-
 .../streams/processor/TimestampExtractorTest.java  |   8 +-
 .../processor/internals/GlobalStateTaskTest.java   |   4 +-
 .../internals/RecordDeserializerTest.java          |   8 +-
 .../processor/internals/RecordQueueTest.java       |  74 +++++++----
 .../processor/internals/StreamTaskTest.java        |  17 ++-
 .../processor/internals/StreamThreadTest.java      |  26 ++--
 .../internals/testutil/ConsumerRecordUtil.java     |   8 +-
 .../state/internals/RecordConvertersTest.java      |   5 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   | 114 +++++++++--------
 .../org/apache/kafka/test/MockRestoreConsumer.java |   5 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   9 +-
 .../kafka/streams/test/ConsumerRecordFactory.java  |   5 +-
 .../apache/kafka/streams/test/TestRecordTest.java  |   5 +-
 52 files changed, 636 insertions(+), 487 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index a7dad7b..2ae93e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 
@@ -32,6 +31,12 @@ import java.util.Optional;
 public class ConsumerRecord<K, V> {
     public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
     public static final int NULL_SIZE = -1;
+
+    /**
+     * @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
+     *             (deprecated since 3.0).
+     */
+    @Deprecated
     public static final int NULL_CHECKSUM = -1;
 
     private final String topic;
@@ -46,8 +51,6 @@ public class ConsumerRecord<K, V> {
     private final V value;
     private final Optional<Integer> leaderEpoch;
 
-    private volatile Long checksum;
-
     /**
      * Creates a record to be received from a specified topic and partition (provided for
      * compatibility with Kafka 0.9 before the message format supported timestamps and before
@@ -64,8 +67,52 @@ public class ConsumerRecord<K, V> {
                           long offset,
                           K key,
                           V value) {
-        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
-                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
+        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, NULL_SIZE, NULL_SIZE, key, value,
+            new RecordHeaders(), Optional.empty());
+    }
+
+    /**
+     * Creates a record to be received from a specified topic and partition
+     *
+     * @param topic The topic this record is received from
+     * @param partition The partition of the topic this record is received from
+     * @param offset The offset of this record in the corresponding Kafka partition
+     * @param timestamp The timestamp of the record.
+     * @param timestampType The timestamp type
+     * @param serializedKeySize The length of the serialized key
+     * @param serializedValueSize The length of the serialized value
+     * @param key The key of the record, if one exists (null is allowed)
+     * @param value The record contents
+     * @param headers The headers of the record
+     * @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
+     */
+    public ConsumerRecord(String topic,
+                          int partition,
+                          long offset,
+                          long timestamp,
+                          TimestampType timestampType,
+                          int serializedKeySize,
+                          int serializedValueSize,
+                          K key,
+                          V value,
+                          Headers headers,
+                          Optional<Integer> leaderEpoch) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic cannot be null");
+        if (headers == null)
+            throw new IllegalArgumentException("Headers cannot be null");
+
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.timestampType = timestampType;
+        this.serializedKeySize = serializedKeySize;
+        this.serializedValueSize = serializedValueSize;
+        this.key = key;
+        this.value = value;
+        this.headers = headers;
+        this.leaderEpoch = leaderEpoch;
     }
 
     /**
@@ -77,12 +124,15 @@ public class ConsumerRecord<K, V> {
      * @param offset The offset of this record in the corresponding Kafka partition
      * @param timestamp The timestamp of the record.
      * @param timestampType The timestamp type
-     * @param checksum The checksum (CRC32) of the full record
      * @param serializedKeySize The length of the serialized key
      * @param serializedValueSize The length of the serialized value
      * @param key The key of the record, if one exists (null is allowed)
      * @param value The record contents
+     *
+     * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+     *             Apache Kafka 4.0 (deprecated since 3.0).
      */
+    @Deprecated
     public ConsumerRecord(String topic,
                           int partition,
                           long offset,
@@ -93,8 +143,8 @@ public class ConsumerRecord<K, V> {
                           int serializedValueSize,
                           K key,
                           V value) {
-        this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
-                key, value, new RecordHeaders());
+        this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
+                key, value, new RecordHeaders(), Optional.empty());
     }
 
     /**
@@ -105,13 +155,16 @@ public class ConsumerRecord<K, V> {
      * @param offset The offset of this record in the corresponding Kafka partition
      * @param timestamp The timestamp of the record.
      * @param timestampType The timestamp type
-     * @param checksum The checksum (CRC32) of the full record
      * @param serializedKeySize The length of the serialized key
      * @param serializedValueSize The length of the serialized value
      * @param key The key of the record, if one exists (null is allowed)
      * @param value The record contents
      * @param headers The headers of the record.
+     *
+     * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+     *             Apache Kafka 4.0 (deprecated since 3.0).
      */
+    @Deprecated
     public ConsumerRecord(String topic,
                           int partition,
                           long offset,
@@ -123,7 +176,7 @@ public class ConsumerRecord<K, V> {
                           K key,
                           V value,
                           Headers headers) {
-        this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
+        this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
                 key, value, headers, Optional.empty());
     }
 
@@ -135,14 +188,17 @@ public class ConsumerRecord<K, V> {
      * @param offset The offset of this record in the corresponding Kafka partition
      * @param timestamp The timestamp of the record.
      * @param timestampType The timestamp type
-     * @param checksum The checksum (CRC32) of the full record
      * @param serializedKeySize The length of the serialized key
      * @param serializedValueSize The length of the serialized value
      * @param key The key of the record, if one exists (null is allowed)
      * @param value The record contents
      * @param headers The headers of the record
      * @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
+     *
+     * @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
+     *             Apache Kafka 4.0 (deprecated since 3.0).
      */
+    @Deprecated
     public ConsumerRecord(String topic,
                           int partition,
                           long offset,
@@ -155,23 +211,8 @@ public class ConsumerRecord<K, V> {
                           V value,
                           Headers headers,
                           Optional<Integer> leaderEpoch) {
-        if (topic == null)
-            throw new IllegalArgumentException("Topic cannot be null");
-        if (headers == null)
-            throw new IllegalArgumentException("Headers cannot be null");
-
-        this.topic = topic;
-        this.partition = partition;
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.timestampType = timestampType;
-        this.checksum = checksum;
-        this.serializedKeySize = serializedKeySize;
-        this.serializedValueSize = serializedValueSize;
-        this.key = key;
-        this.value = value;
-        this.headers = headers;
-        this.leaderEpoch = leaderEpoch;
+        this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize, key, value, headers,
+            leaderEpoch);
     }
 
     /**
@@ -231,24 +272,6 @@ public class ConsumerRecord<K, V> {
     }
 
     /**
-     * The checksum (CRC32) of the record.
-     *
-     * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
-     *             checksum returned by the broker may not match what was computed by the producer.
-     *             It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
-     *             message format v2 does not include a record-level checksum (for performance, the record checksum
-     *             was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
-     *             the record timestamp, serialized key size, and serialized value size is returned instead, but
-     *             this should not be depended on for end-to-end reliability.
-     */
-    @Deprecated
-    public long checksum() {
-        if (checksum == null)
-            this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
-        return this.checksum;
-    }
-
-    /**
      * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
      * is -1.
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8f9b7a2..fa214ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1385,7 +1385,7 @@ public class Fetcher<K, V> implements Closeable {
             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType, record.checksumOrNull(),
+                                        timestamp, timestampType,
                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                         key, value, headers, leaderEpoch);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0b01a86..7f573d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1359,7 +1359,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) {
-            metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
+            metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
             this.interceptors.onAcknowledgement(metadata, exception);
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 650b697..f733869 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -311,10 +311,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         ProduceRequestResult result = new ProduceRequestResult(topicPartition);
         FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
-                0L, 0, 0, Time.SYSTEM);
+                0, 0, Time.SYSTEM);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
-                RecordBatch.NO_TIMESTAMP, 0L, 0, 0), result, callback, topicPartition);
+        long baseOffset = Math.max(0, offset - Integer.MAX_VALUE);
+        int batchIndex = (int) Math.min(Integer.MAX_VALUE, offset);
+        Completion completion = new Completion(offset, new RecordMetadata(topicPartition, baseOffset, batchIndex,
+                RecordBatch.NO_TIMESTAMP, 0, 0), result, callback, topicPartition);
 
         if (!this.transactionInFlight)
             this.sent.add(record);
@@ -537,7 +539,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
                 if (e == null)
                     callback.onCompletion(metadata, null);
                 else
-                    callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1), e);
+                    callback.onCompletion(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1), e);
             }
             result.done();
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 1ae5cf6..8efc4c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.ProduceResponse;
 
@@ -42,21 +41,38 @@ public final class RecordMetadata {
     private final int serializedValueSize;
     private final TopicPartition topicPartition;
 
-    private volatile Long checksum;
-
-    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
-                          Long checksum, int serializedKeySize, int serializedValueSize) {
-        // ignore the relativeOffset if the base offset is -1,
-        // since this indicates the offset is unknown
-        this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
+    /**
+     * Creates a new instance with the provided parameters.
+     */
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, int batchIndex, long timestamp,
+                          int serializedKeySize, int serializedValueSize) {
+        // ignore the batchIndex if the base offset is -1, since this indicates the offset is unknown
+        this.offset = baseOffset == -1 ? baseOffset : baseOffset + batchIndex;
         this.timestamp = timestamp;
-        this.checksum = checksum;
         this.serializedKeySize = serializedKeySize;
         this.serializedValueSize = serializedValueSize;
         this.topicPartition = topicPartition;
     }
 
     /**
+     * Creates a new instance with the provided parameters.
+     *
+     * @deprecated use constructor without `checksum` parameter. This constructor will be removed in
+     *             Apache Kafka 4.0 (deprecated since 3.0).
+     */
+    @Deprecated
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long batchIndex, long timestamp,
+                          Long checksum, int serializedKeySize, int serializedValueSize) {
+        this(topicPartition, baseOffset, batchIndexToInt(batchIndex), timestamp, serializedKeySize, serializedValueSize);
+    }
+
+    private static int batchIndexToInt(long batchIndex) {
+        if (batchIndex > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("batchIndex is larger than Integer.MAX_VALUE: " + batchIndex);
+        return (int) batchIndex;
+    }
+
+    /**
      * Indicates whether the record metadata includes the offset.
      * @return true if the offset is included in the metadata, false otherwise.
      */
@@ -90,25 +106,6 @@ public final class RecordMetadata {
     }
 
     /**
-     * The checksum (CRC32) of the record.
-     *
-     * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
-     *             computed checksum may not match what was stored on the broker, or what will be returned to the consumer.
-     *             It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
-     *             message format v2 does not include a record-level checksum (for performance, the record checksum
-     *             was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
-     *             the record timestamp, serialized key size, and serialized value size is returned instead, but
-     *             this should not be depended on for end-to-end reliability.
-     */
-    @Deprecated
-    public long checksum() {
-        if (checksum == null)
-            // The checksum is null only for message format v2 and above, which do not have a record-level checksum.
-            this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
-        return this.checksum;
-    }
-
-    /**
      * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
      * is -1.
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index a9665a9..0026237 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -32,18 +32,16 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     private final ProduceRequestResult result;
     private final int batchIndex;
     private final long createTimestamp;
-    private final Long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
     private final Time time;
     private volatile FutureRecordMetadata nextRecordMetadata = null;
 
-    public FutureRecordMetadata(ProduceRequestResult result, int batchIndex, long createTimestamp,
-                                Long checksum, int serializedKeySize, int serializedValueSize, Time time) {
+    public FutureRecordMetadata(ProduceRequestResult result, int batchIndex, long createTimestamp, int serializedKeySize,
+                                int serializedValueSize, Time time) {
         this.result = result;
         this.batchIndex = batchIndex;
         this.createTimestamp = createTimestamp;
-        this.checksum = checksum;
         this.serializedKeySize = serializedKeySize;
         this.serializedValueSize = serializedValueSize;
         this.time = time;
@@ -101,15 +99,11 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
             return value();
     }
 
-    Long checksumOrNull() {
-        return this.checksum;
-    }
-
     RecordMetadata value() {
         if (nextRecordMetadata != null)
             return nextRecordMetadata.value();
         return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.batchIndex,
-                                  timestamp(), this.checksum, this.serializedKeySize, this.serializedValueSize);
+                                  timestamp(), this.serializedKeySize, this.serializedValueSize);
     }
 
     private long timestamp() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 6f76c79..4da0362 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -106,12 +106,12 @@ public final class ProducerBatch {
         if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
             return null;
         } else {
-            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
+            this.recordsBuilder.append(timestamp, key, value, headers);
             this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                     recordsBuilder.compressionType(), key, value, headers));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
-                                                                   timestamp, checksum,
+                                                                   timestamp,
                                                                    key == null ? -1 : key.length,
                                                                    value == null ? -1 : value.length,
                                                                    Time.SYSTEM);
@@ -136,7 +136,7 @@ public final class ProducerBatch {
             this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                     recordsBuilder.compressionType(), key, value, headers));
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
-                                                                   timestamp, thunk.future.checksumOrNull(),
+                                                                   timestamp,
                                                                    key == null ? -1 : key.remaining(),
                                                                    value == null ? -1 : value.remaining(),
                                                                    Time.SYSTEM);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 61a8b7a..ceec552 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -114,7 +114,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
                                 record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                     }
                     interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
-                                    RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
+                                    RecordBatch.NO_TIMESTAMP, -1, -1), exception);
                 }
             } catch (Exception e) {
                 // do not propagate interceptor exceptions, just log
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index d85f100..bbaed94 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.ByteUtils;
-import org.apache.kafka.common.utils.Checksums;
-import org.apache.kafka.common.utils.Crc32C;
 import org.apache.kafka.common.utils.PrimitiveRef;
 import org.apache.kafka.common.utils.PrimitiveRef.IntRef;
 import org.apache.kafka.common.utils.Utils;
@@ -33,7 +31,6 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
-import java.util.zip.Checksum;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
 
@@ -632,13 +629,4 @@ public class DefaultRecord implements Record {
         int valueSize = value == null ? -1 : value.remaining();
         return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
     }
-
-
-    public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) {
-        Checksum checksum = Crc32C.create();
-        Checksums.updateLong(checksum, timestamp);
-        Checksums.updateInt(checksum, serializedKeySize);
-        Checksums.updateInt(checksum, serializedValueSize);
-        return checksum.getValue();
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 91edf2c..59ad210 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -396,9 +396,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
     }
 
     /**
-     * Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
+     * Append a new record at the given offset.
      */
-    private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
+    private void appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                   ByteBuffer value, Header[] headers) {
         try {
             if (isControlRecord != isControlBatch)
@@ -419,9 +419,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
 
             if (magic > RecordBatch.MAGIC_VALUE_V1) {
                 appendDefaultRecord(offset, timestamp, key, value, headers);
-                return null;
             } else {
-                return appendLegacyRecord(offset, timestamp, key, value, magic);
+                appendLegacyRecord(offset, timestamp, key, value, magic);
             }
         } catch (IOException e) {
             throw new KafkaException("I/O exception when writing to the append stream, closing", e);
@@ -435,10 +434,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
-        return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
+    public void appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
+        appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
     }
 
     /**
@@ -448,10 +446,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
-        return appendWithOffset(offset, false, timestamp, key, value, headers);
+    public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+        appendWithOffset(offset, false, timestamp, key, value, headers);
     }
 
     /**
@@ -460,10 +457,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
-        return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+    public void appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
+        appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
     /**
@@ -472,20 +468,18 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
-        return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
+    public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
+        appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
     /**
      * Append a new record at the given offset.
      * @param offset The absolute offset of the record in the log buffer
      * @param record The record to append
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendWithOffset(long offset, SimpleRecord record) {
-        return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
+    public void appendWithOffset(long offset, SimpleRecord record) {
+        appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     /**
@@ -494,15 +488,14 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      *
      * @param offset The absolute offset of the record in the log buffer
      * @param record The record to append
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long appendControlRecordWithOffset(long offset, SimpleRecord record) {
+    public void appendControlRecordWithOffset(long offset, SimpleRecord record) {
         short typeId = ControlRecordType.parseTypeId(record.key());
         ControlRecordType type = ControlRecordType.fromTypeId(typeId);
         if (type == ControlRecordType.UNKNOWN)
             throw new IllegalArgumentException("Cannot append record with unknown control record type " + typeId);
 
-        return appendWithOffset(offset, true, record.timestamp(),
+        appendWithOffset(offset, true, record.timestamp(),
             record.key(), record.value(), record.headers());
     }
 
@@ -511,10 +504,9 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long append(long timestamp, ByteBuffer key, ByteBuffer value) {
-        return append(timestamp, key, value, Record.EMPTY_HEADERS);
+    public void append(long timestamp, ByteBuffer key, ByteBuffer value) {
+        append(timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
     /**
@@ -525,8 +517,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param headers The record headers if there are any
      * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
-        return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
+    public void append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+        appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
     }
 
     /**
@@ -536,8 +528,8 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param value The record value
      * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long append(long timestamp, byte[] key, byte[] value) {
-        return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+    public void append(long timestamp, byte[] key, byte[] value) {
+        append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
     /**
@@ -546,19 +538,17 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
-        return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
+    public void append(long timestamp, byte[] key, byte[] value, Header[] headers) {
+        append(timestamp, wrapNullable(key), wrapNullable(value), headers);
     }
 
     /**
      * Append a new record at the next sequential offset.
      * @param record The record to append
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public Long append(SimpleRecord record) {
-        return appendWithOffset(nextSequentialOffset(), record);
+    public void append(SimpleRecord record) {
+        appendWithOffset(nextSequentialOffset(), record);
     }
 
     /**
@@ -566,36 +556,29 @@ public class MemoryRecordsBuilder implements AutoCloseable {
      * @param timestamp The record timestamp
      * @param type The control record type (cannot be UNKNOWN)
      * @param value The control record value
-     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    private Long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+    private void appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
         Struct keyStruct = type.recordKey();
         ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
         keyStruct.writeTo(key);
         key.flip();
-        return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
+        appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
-    /**
-     * Return CRC of the record or null if record-level CRC is not supported for the message format
-     */
-    public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
+    public void appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
         if (producerId == RecordBatch.NO_PRODUCER_ID)
             throw new IllegalArgumentException("End transaction marker requires a valid producerId");
         if (!isTransactional)
             throw new IllegalArgumentException("End transaction marker depends on batch transactional flag being enabled");
         ByteBuffer value = marker.serializeValue();
-        return appendControlRecord(timestamp, marker.controlType(), value);
+        appendControlRecord(timestamp, marker.controlType(), value);
     }
 
-    /**
-     * Return CRC of the record or null if record-level CRC is not supported for the message format
-     */
-    public Long appendLeaderChangeMessage(long timestamp, LeaderChangeMessage leaderChangeMessage) {
+    public void appendLeaderChangeMessage(long timestamp, LeaderChangeMessage leaderChangeMessage) {
         if (partitionLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) {
             throw new IllegalArgumentException("Partition leader epoch must be valid, but get " + partitionLeaderEpoch);
         }
-        return appendControlRecord(timestamp, ControlRecordType.LEADER_CHANGE,
+        appendControlRecord(timestamp, ControlRecordType.LEADER_CHANGE,
                 MessageUtil.toByteBuffer(leaderChangeMessage, ControlRecordUtils.LEADER_CHANGE_SCHEMA_VERSION));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index 2dc1dde..848f75c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -16,11 +16,12 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -28,8 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class ConsumerRecordTest {
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void testOldConstructor() {
+    public void testShortConstructor() {
         String topic = "topic";
         int partition = 0;
         long offset = 23;
@@ -44,7 +44,6 @@ public class ConsumerRecordTest {
         assertEquals(value, record.value());
         assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
         assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp());
-        assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
         assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
         assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
         assertEquals(Optional.empty(), record.leaderEpoch());
@@ -52,14 +51,63 @@ public class ConsumerRecordTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void testNullChecksumInConstructor() {
+    @Deprecated
+    public void testConstructorsWithChecksum() {
+        String topic = "topic";
+        int partition = 0;
+        long offset = 23;
+        long timestamp = 23434217432432L;
+        TimestampType timestampType = TimestampType.CREATE_TIME;
         String key = "key";
         String value = "value";
-        long timestamp = 242341324L;
-        ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 0, 23L, timestamp,
-                TimestampType.CREATE_TIME, null, key.length(), value.length(), key, value, new RecordHeaders());
-        assertEquals(DefaultRecord.computePartialChecksum(timestamp, key.length(), value.length()), record.checksum());
+        long checksum = 50L;
+        int serializedKeySize = 100;
+        int serializedValueSize = 1142;
+
+        ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+            checksum, serializedKeySize, serializedValueSize, key, value);
+        assertEquals(topic, record.topic());
+        assertEquals(partition, record.partition());
+        assertEquals(offset, record.offset());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertEquals(timestampType, record.timestampType());
+        assertEquals(timestamp, record.timestamp());
+        assertEquals(serializedKeySize, record.serializedKeySize());
+        assertEquals(serializedValueSize, record.serializedValueSize());
+        assertEquals(Optional.empty(), record.leaderEpoch());
+        assertEquals(new RecordHeaders(), record.headers());
+
+        RecordHeaders headers = new RecordHeaders();
+        headers.add(new RecordHeader("header key", "header value".getBytes(StandardCharsets.UTF_8)));
+        record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+                checksum, serializedKeySize, serializedValueSize, key, value, headers);
+        assertEquals(topic, record.topic());
+        assertEquals(partition, record.partition());
+        assertEquals(offset, record.offset());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertEquals(timestampType, record.timestampType());
+        assertEquals(timestamp, record.timestamp());
+        assertEquals(serializedKeySize, record.serializedKeySize());
+        assertEquals(serializedValueSize, record.serializedValueSize());
+        assertEquals(Optional.empty(), record.leaderEpoch());
+        assertEquals(headers, record.headers());
+
+        Optional<Integer> leaderEpoch = Optional.of(10);
+        record = new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,
+                checksum, serializedKeySize, serializedValueSize, key, value, headers, leaderEpoch);
+        assertEquals(topic, record.topic());
+        assertEquals(partition, record.partition());
+        assertEquals(offset, record.offset());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertEquals(timestampType, record.timestampType());
+        assertEquals(timestamp, record.timestamp());
+        assertEquals(serializedKeySize, record.serializedKeySize());
+        assertEquals(serializedValueSize, record.serializedValueSize());
+        assertEquals(leaderEpoch, record.leaderEpoch());
+        assertEquals(headers, record.headers());
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index e34b6b1..d414450 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -24,7 +24,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
@@ -37,10 +40,12 @@ public class ConsumerRecordsTest {
 
         String topic = "topic";
         records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
+        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME,
+            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME,
+            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
         records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());
+        records.put(new TopicPartition(topic, 2), new ArrayList<>());
 
         ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
         Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 9a74b06..be2ea0e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
@@ -47,8 +48,10 @@ public class MockConsumerTest {
         beginningOffsets.put(new TopicPartition("test", 1), 0L);
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME,
+            0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME,
+            0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
         ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(1));
@@ -74,8 +77,10 @@ public class MockConsumerTest {
         beginningOffsets.put(new TopicPartition("test", 1), 0L);
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME,
+            0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME,
+            0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
         ConsumerRecords<String, String> recs = consumer.poll(1);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 1fad0c5..19ac256 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
@@ -30,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,8 +45,8 @@ public class ConsumerInterceptorsTest {
     private final TopicPartition tp = new TopicPartition(topic, partition);
     private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
     private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
-    private final ConsumerRecord<Integer, Integer> consumerRecord =
-        new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1);
+    private final ConsumerRecord<Integer, Integer> consumerRecord = new ConsumerRecord<>(topic, partition, 0, 0L,
+        TimestampType.CREATE_TIME, 0, 0, 1, 1, new RecordHeaders(), Optional.empty());
     private int onCommitCount = 0;
     private int onConsumeCount = 0;
 
@@ -117,11 +119,11 @@ public class ConsumerInterceptorsTest {
         List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
         list1.add(consumerRecord);
         List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
-        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0,
-                                       0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
+        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L,
+            TimestampType.CREATE_TIME, 0, 0, 1, 1, new RecordHeaders(), Optional.empty()));
         List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
-        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0,
-                                       0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
+        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME,
+            0, 0, 1, 1, new RecordHeaders(), Optional.empty()));
         records.put(tp, list1);
         records.put(filterTopicPart1, list2);
         records.put(filterTopicPart2, list3);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
index 51764dc..b6207fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.DefaultRecord;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -26,56 +25,66 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 public class RecordMetadataTest {
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void testConstructionWithMissingRelativeOffset() {
+    public void testConstructionWithMissingBatchIndex() {
         TopicPartition tp = new TopicPartition("foo", 0);
         long timestamp = 2340234L;
         int keySize = 3;
         int valueSize = 5;
-        Long checksum = 908923L;
 
-        RecordMetadata metadata = new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
+        RecordMetadata metadata = new RecordMetadata(tp, -1L, -1, timestamp, keySize, valueSize);
         assertEquals(tp.topic(), metadata.topic());
         assertEquals(tp.partition(), metadata.partition());
         assertEquals(timestamp, metadata.timestamp());
         assertFalse(metadata.hasOffset());
         assertEquals(-1L, metadata.offset());
-        assertEquals(checksum.longValue(), metadata.checksum());
         assertEquals(keySize, metadata.serializedKeySize());
         assertEquals(valueSize, metadata.serializedValueSize());
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void testConstructionWithRelativeOffset() {
+    public void testConstructionWithBatchIndexOffset() {
         TopicPartition tp = new TopicPartition("foo", 0);
         long timestamp = 2340234L;
         int keySize = 3;
         int valueSize = 5;
         long baseOffset = 15L;
-        long relativeOffset = 3L;
-        Long checksum = 908923L;
+        int batchIndex = 3;
 
-        RecordMetadata metadata = new RecordMetadata(tp, baseOffset, relativeOffset, timestamp, checksum,
-                keySize, valueSize);
+        RecordMetadata metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, keySize, valueSize);
         assertEquals(tp.topic(), metadata.topic());
         assertEquals(tp.partition(), metadata.partition());
         assertEquals(timestamp, metadata.timestamp());
-        assertEquals(baseOffset + relativeOffset, metadata.offset());
-        assertEquals(checksum.longValue(), metadata.checksum());
+        assertEquals(baseOffset + batchIndex, metadata.offset());
         assertEquals(keySize, metadata.serializedKeySize());
         assertEquals(valueSize, metadata.serializedValueSize());
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void testNullChecksum() {
+    @Deprecated
+    public void testConstructionWithChecksum() {
+        TopicPartition tp = new TopicPartition("foo", 0);
         long timestamp = 2340234L;
+        long baseOffset = 15L;
+        long batchIndex = 3L;
         int keySize = 3;
         int valueSize = 5;
-        RecordMetadata metadata = new RecordMetadata(new TopicPartition("foo", 0), 15L, 3L, timestamp, null,
-                keySize, valueSize);
-        assertEquals(DefaultRecord.computePartialChecksum(timestamp, keySize, valueSize), metadata.checksum());
+
+        RecordMetadata metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, null, keySize, valueSize);
+        assertEquals(tp.topic(), metadata.topic());
+        assertEquals(tp.partition(), metadata.partition());
+        assertEquals(timestamp, metadata.timestamp());
+        assertEquals(baseOffset + batchIndex, metadata.offset());
+        assertEquals(keySize, metadata.serializedKeySize());
+        assertEquals(valueSize, metadata.serializedValueSize());
+
+        long checksum = 133424L;
+        metadata = new RecordMetadata(tp, baseOffset, batchIndex, timestamp, checksum, keySize, valueSize);
+        assertEquals(tp.topic(), metadata.topic());
+        assertEquals(tp.partition(), metadata.partition());
+        assertEquals(timestamp, metadata.timestamp());
+        assertEquals(baseOffset + batchIndex, metadata.offset());
+        assertEquals(keySize, metadata.serializedKeySize());
+        assertEquals(valueSize, metadata.serializedValueSize());
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index b7cfd4e..86632ee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -48,7 +48,7 @@ public class RecordSendTest {
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult(topicPartition);
         FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
-                RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+                RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
         assertFalse(future.isDone(), "Request is not completed");
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -68,7 +68,7 @@ public class RecordSendTest {
     @Test
     public void testError() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
-                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
         assertThrows(ExecutionException.class, future::get);
     }
 
@@ -78,7 +78,7 @@ public class RecordSendTest {
     @Test
     public void testBlocking() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
-                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0, Time.SYSTEM);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
index 74649c8..1fd7a43 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadataTest.java
@@ -67,7 +67,6 @@ public class FutureRecordMetadataTest {
                 produceRequestResult,
                 0,
                 RecordBatch.NO_TIMESTAMP,
-                0L,
                 0,
                 0,
                 time
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index b6ee582..6af8289 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
@@ -61,14 +60,6 @@ public class ProducerBatchTest {
             CompressionType.NONE, TimestampType.CREATE_TIME, 128);
 
     @Test
-    public void testChecksumNullForMagicV2() {
-        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
-        assertNotNull(future);
-        assertNull(future.checksumOrNull());
-    }
-
-    @Test
     public void testBatchAbort() throws Exception {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
         MockCallback callback = new MockCallback();
@@ -139,23 +130,6 @@ public class ProducerBatchTest {
     }
 
     @Test
-    public void testAppendedChecksumMagicV0AndV1() {
-        for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) {
-            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
-                    CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-            ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
-            byte[] key = "hi".getBytes();
-            byte[] value = "there".getBytes();
-
-            FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
-            assertNotNull(future);
-            byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
-            long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
-            assertEquals(expectedChecksum, future.checksumOrNull().longValue());
-        }
-    }
-
-    @Test
     public void testSplitPreservesHeaders() {
         for (CompressionType compressionType : CompressionType.values()) {
             MemoryRecordsBuilder builder = MemoryRecords.builder(
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 3cc63a9..cd15a3e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -144,7 +144,7 @@ public class ProducerInterceptorsTest {
         ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
 
         // verify onAck is called on all interceptors
-        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0);
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
         interceptors.onAcknowledgement(meta, null);
         assertEquals(2, onAckCount);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 13eaa9d..5dedf66 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -437,11 +437,10 @@ public class MemoryRecordsBuilderTest {
                 TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
                 RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
                 RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
-        Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes());
+        builder.append(1L, "key".getBytes(), "value".getBytes());
         MemoryRecords memoryRecords = builder.build();
         List<Record> records = TestUtils.toList(memoryRecords.records());
         assertEquals(1, records.size());
-        assertEquals(checksumOrNull, records.get(0).checksumOrNull());
     }
 
     @ParameterizedTest
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 3fcc157..b01584b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -26,12 +26,14 @@ import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -56,7 +58,6 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
 
         // This will ensure that we get the cluster metadata when onConsume is called for the first time
@@ -69,13 +70,14 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
             for (ConsumerRecord<String, String> record: records.records(tp)) {
                 lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
                                              record.timestamp(), record.timestampType(),
-                                             record.checksum(), record.serializedKeySize(),
+                                             record.serializedKeySize(),
                                              record.serializedValueSize(),
-                                             record.key(), record.value().toUpperCase(Locale.ROOT)));
+                                             record.key(), record.value().toUpperCase(Locale.ROOT),
+                                             new RecordHeaders(), Optional.empty()));
             }
             recordMap.put(tp, lst);
         }
-        return new ConsumerRecords<String, String>(recordMap);
+        return new ConsumerRecords<>(recordMap);
     }
 
     @Override
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 0b47cc9..9cf09f8 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,7 +50,7 @@ public class MirrorSourceTaskTest {
         headers.add("header1", new byte[]{'l', 'm', 'n', 'o'});
         headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
         ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
-            TimestampType.CREATE_TIME, 0L, 5, 6, key, value, headers);
+            TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
                 new DefaultReplicationPolicy(), 50);
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
@@ -111,9 +112,9 @@ public class MirrorSourceTaskTest {
             new RecordHeader(headerKey, "value".getBytes()),
         });
         consumerRecordsList.add(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
-                TimestampType.CREATE_TIME, 0L, key1.length, value1.length, key1, value1, headers));
+                TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
         consumerRecordsList.add(new ConsumerRecord<>(topicName, 1, 1, System.currentTimeMillis(),
-                TimestampType.CREATE_TIME, 0L, key2.length, value2.length, key2, value2, headers));
+                TimestampType.CREATE_TIME, key2.length, value2.length, key2, value2, headers, Optional.empty()));
         ConsumerRecords<byte[], byte[]> consumerRecords =
                 new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(topicName, 0), consumerRecordsList));
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
index b709e28..aa1d0be 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -95,8 +96,8 @@ public class WorkerErrantRecordReporter implements ErrantRecordReporter {
             int valLength = value != null ? value.length : -1;
 
             consumerRecord = new ConsumerRecord<>(record.topic(), record.kafkaPartition(),
-                record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength,
-                valLength, key, value, headers);
+                record.kafkaOffset(), record.timestamp(), record.timestampType(), keyLength,
+                valLength, key, value, headers, Optional.empty());
         }
 
         Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index a7d68a6..b7492d3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -75,6 +75,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -1111,8 +1112,10 @@ public class WorkerSinkTaskTest {
                 long timestamp = RecordBatch.NO_TIMESTAMP;
                 TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
                 List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-                records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
-                records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET + recordsReturnedTp3 + 1, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE));
+                records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, timestamp, timestampType,
+                    0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
+                records.add(new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET + recordsReturnedTp3 + 1, timestamp, timestampType,
+                    0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
                 recordsReturnedTp1 += 1;
                 recordsReturnedTp3 += 1;
                 return new ConsumerRecords<>(Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records));
@@ -1495,9 +1498,9 @@ public class WorkerSinkTaskTest {
 
         expectConsumerPoll(Arrays.asList(
             new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 1, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
-                0L, 0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA),
+                0, 0, keyA.getBytes(), valueA.getBytes(encodingA), headersA, Optional.empty()),
             new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + 2, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
-                0L, 0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB)
+                0, 0, keyB.getBytes(), valueB.getBytes(encodingB), headersB, Optional.empty())
         ));
 
         expectTransformation(2, null);
@@ -1614,7 +1617,8 @@ public class WorkerSinkTaskTest {
             () -> {
                 List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                 for (int i = 0; i < numMessages; i++)
-                    records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType, 0L, 0, 0, RAW_KEY, RAW_VALUE, headers));
+                    records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturnedTp1 + i, timestamp, timestampType,
+                        0, 0, RAW_KEY, RAW_VALUE, headers, Optional.empty()));
                 recordsReturnedTp1 += numMessages;
                 return new ConsumerRecords<>(
                         numMessages > 0 ?
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 336f24b..7b4474b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -555,9 +556,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                         Collections.singletonMap(
                                 new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(
-                                        new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
-                                )));
+                                Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+                                    0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()))));
                 recordsReturned++;
                 return records;
             });
@@ -586,9 +586,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                         Collections.singletonMap(
                                 new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(
-                                        new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
-                                )));
+                                Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+                                    0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()))));
                 recordsReturned++;
                 return records;
             });
@@ -617,8 +616,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                         Collections.singletonMap(
                                 new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(
-                                        new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
+                                Arrays.asList(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
+                                    0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty())
                                 )));
                 recordsReturned++;
                 return records;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index aefc3a9..4d358ca 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -1435,7 +1435,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                 for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
                     if (sendSuccess) {
                         cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
-                            0L, 0L, 0, 0), null);
+                            0L, 0, 0), null);
                     } else {
                         cb.onCompletion(null, new TopicAuthorizationException("foo"));
                     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
index 3e5e082..89f1013 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ProcessingContextTest.java
@@ -48,7 +48,7 @@ public class ProcessingContextTest {
         Future<Void> result = context.report();
         fs.forEach(f -> {
             assertFalse(result.isDone());
-            f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0, 0L, 0, 0));
+            f.complete(new RecordMetadata(new TopicPartition("t", 0), 0, 0, 0, 0, 0));
         });
         assertTrue(result.isDone());
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 4504d39..d3960e5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Field;
@@ -55,6 +56,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -464,11 +466,16 @@ public class KafkaConfigBackingStoreTest {
     public void testRestoreTargetState() throws Exception {
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -506,10 +513,14 @@ public class KafkaConfigBackingStoreTest {
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -549,10 +560,14 @@ public class KafkaConfigBackingStoreTest {
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -607,11 +622,16 @@ public class KafkaConfigBackingStoreTest {
     public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -651,14 +671,21 @@ public class KafkaConfigBackingStoreTest {
         expectConfigure();
         // Overwrite each type at least once to ensure we see the latest data after loading
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
                 // Connector after root update should make it through, task update shouldn't
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -707,12 +734,18 @@ public class KafkaConfigBackingStoreTest {
         expectConfigure();
         // Overwrite each type at least once to ensure we see the latest data after loading
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
 
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
@@ -752,15 +785,23 @@ public class KafkaConfigBackingStoreTest {
         expectConfigure();
         // Overwrite each type at least once to ensure we see the latest data after loading
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-            new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-            new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-            new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                    CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
             // Connector after root update should make it through, task update shouldn't
-            new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-            new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
-            new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
+            new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(6), new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                    CONFIGS_SERIALIZED.get(7), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -807,12 +848,16 @@ public class KafkaConfigBackingStoreTest {
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
                 // This is the record that has been compacted:
                 //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -966,8 +1011,11 @@ public class KafkaConfigBackingStoreTest {
         EasyMock.expect(storeLog.readToEnd())
                 .andAnswer(() -> {
                     TestFuture<Void> future = new TestFuture<>();
-                    for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
-                        capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
+                    for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) {
+                        capturedConsumedCallback.getValue().onCompletion(null,
+                            new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0,
+                                entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty()));
+                    }
                     future.resolveOnGet((Void) null);
                     return future;
                 });
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index d216068..07eadc8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -46,6 +47,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -149,10 +151,14 @@ public class KafkaOffsetBackingStoreTest {
     public void testReloadOnStart() throws Exception {
         expectConfigure();
         expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty())
         ));
         expectStop();
         expectClusterId();
@@ -196,8 +202,12 @@ public class KafkaOffsetBackingStoreTest {
         final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
         storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
         PowerMock.expectLastCall().andAnswer(() -> {
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
             secondGetReadToEndCallback.getValue().onCompletion(null, null);
             return null;
         });
@@ -206,8 +216,12 @@ public class KafkaOffsetBackingStoreTest {
         final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
         storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
         PowerMock.expectLastCall().andAnswer(() -> {
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
             thirdGetReadToEndCallback.getValue().onCompletion(null, null);
             return null;
         });
@@ -271,8 +285,12 @@ public class KafkaOffsetBackingStoreTest {
         final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
         storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
         PowerMock.expectLastCall().andAnswer(() -> {
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, null, TP0_VALUE.array()));
-            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), null));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, null, TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null,
+                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), null,
+                    new RecordHeaders(), Optional.empty()));
             secondGetReadToEndCallback.getValue().onCompletion(null, null);
             return null;
         });
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index 4c0c7f9..f19be75 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.data.Schema;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
@@ -436,7 +438,7 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
 
     private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
         return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
-                TimestampType.CREATE_TIME, 0L, 0, 0, key, value);
+                TimestampType.CREATE_TIME, 0, 0, key, value, new RecordHeaders(), Optional.empty());
     }
 
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index e36f2a9..8fae57e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
@@ -56,6 +57,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -187,12 +189,14 @@ public class KafkaBasedLogTest {
             consumer.scheduleNopPollTask();
             consumer.scheduleNopPollTask();
             consumer.schedulePollTask(() ->
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE))
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+                    new RecordHeaders(), Optional.empty()))
             );
             consumer.scheduleNopPollTask();
             consumer.scheduleNopPollTask();
             consumer.schedulePollTask(() ->
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE))
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE,
+                    new RecordHeaders(), Optional.empty()))
             );
             consumer.schedulePollTask(finishedLatch::countDown);
         });
@@ -306,13 +310,17 @@ public class KafkaBasedLogTest {
             consumer.scheduleNopPollTask();
             consumer.scheduleNopPollTask();
             consumer.schedulePollTask(() -> {
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+                    new RecordHeaders(), Optional.empty()));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+                    new RecordHeaders(), Optional.empty()));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE,
+                    new RecordHeaders(), Optional.empty()));
             });
 
             consumer.schedulePollTask(() ->
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE_NEW)));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY, TP1_VALUE_NEW,
+                    new RecordHeaders(), Optional.empty())));
 
             // Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch
         });
@@ -357,8 +365,10 @@ public class KafkaBasedLogTest {
             consumer.scheduleNopPollTask();
             consumer.scheduleNopPollTask();
             consumer.schedulePollTask(() -> {
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+                    new RecordHeaders(), Optional.empty()));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+                    new RecordHeaders(), Optional.empty()));
             });
 
             consumer.schedulePollTask(finishedLatch::countDown);
@@ -411,8 +421,10 @@ public class KafkaBasedLogTest {
             consumer.scheduleNopPollTask();
             consumer.scheduleNopPollTask();
             consumer.schedulePollTask(() -> {
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
-                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE,
+                    new RecordHeaders(), Optional.empty()));
+                consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY, TP0_VALUE_NEW,
+                    new RecordHeaders(), Optional.empty()));
             });
 
             consumer.schedulePollTask(finishedLatch::countDown);
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index ce58214..1269ff5 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.time.Duration
 import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
-import java.util.{Collections, Locale, Map, Properties, Random}
+import java.util.{Collections, Locale, Map, Optional, Properties, Random}
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
 import kafka.utils.Implicits._
@@ -35,7 +35,6 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
 import org.apache.kafka.common.utils.Utils
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -112,8 +111,8 @@ object ConsoleConsumer extends Logging {
       }
       messageCount += 1
       try {
-        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
-                                             msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
+        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
+          0, 0, msg.key, msg.value, msg.headers, Optional.empty[Integer]), output)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {
@@ -614,18 +613,3 @@ class NoOpMessageFormatter extends MessageFormatter {
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
 
-class ChecksumMessageFormatter extends MessageFormatter {
-  private var topicStr: String = _
-
-  override def configure(configs: Map[String, _]): Unit = {
-    topicStr = if (configs.containsKey("topic"))
-      configs.get("topic").toString + ":"
-    else
-      ""
-  }
-
-  @nowarn("cat=deprecation")
-  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-    output.println(topicStr + "checksum:" + consumerRecord.checksum)
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 70188e4..1f7e9d1 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -35,7 +35,6 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionException
@@ -102,7 +101,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
    * 2. Last message of the non-blocking send should return the correct offset metadata
    */
-  @nowarn("cat=deprecation")
   @Test
   def testSendOffset(): Unit = {
     val producer = createProducer(brokerList)
@@ -123,7 +121,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
             case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes(StandardCharsets.UTF_8).length)
             case _ => assertTrue(metadata.serializedValueSize > 0)
           }
-          assertNotEquals(metadata.checksum(), 0)
           offset += 1
         } else {
           fail(s"Send callback returns the following exception: $exception")
diff --git a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
index ea72f4f..f2ca243 100644
--- a/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
+++ b/core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
+import java.util.Optional
 import scala.jdk.CollectionConverters._
 
 class DefaultMessageFormatterTest {
@@ -210,12 +211,12 @@ object DefaultMessageFormatterTest {
       offset,
       timestamp,
       timestampType,
-      0L,
       0,
       0,
       if (key == null) null else key.getBytes(StandardCharsets.UTF_8),
       if (value == null) null else value.getBytes(StandardCharsets.UTF_8),
-      new RecordHeaders(headers.asJava))
+      new RecordHeaders(headers.asJava),
+      Optional.empty[Integer])
   }
 
   private def withResource[Resource <: Closeable, Result](resource: Resource)(handler: Resource => Result): Result = {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 17a7459..91ccbcc 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -19,8 +19,7 @@ package kafka.tools
 
 import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.file.Files
-import java.util.{HashMap, Map => JMap}
-
+import java.util.{HashMap, Optional, Map => JMap}
 import kafka.tools.ConsoleConsumer.ConsumerWrapper
 import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
@@ -31,6 +30,7 @@ import org.apache.kafka.test.MockDeserializer
 import org.mockito.Mockito._
 import org.mockito.ArgumentMatchers
 import ArgumentMatchers._
+import org.apache.kafka.common.header.internals.RecordHeaders
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 
@@ -498,7 +498,8 @@ class ConsoleConsumerTest {
     assertEquals("NO_TIMESTAMP\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
 
     out = new ByteArrayOutputStream()
-    val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
+    val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, -1, -1, "key".getBytes, "value".getBytes,
+      new RecordHeaders(), Optional.empty[Integer])
     formatter.writeTo(record2, new PrintStream(out))
     assertEquals("CreateTime:123\tPartition:0\tOffset:123\tkey\tvalue\n", out.toString)
     formatter.close()
@@ -515,22 +516,4 @@ class ConsoleConsumerTest {
     assertEquals("", out.toString)
   }
 
-  @Test
-  def testChecksumMessageFormatter(): Unit = {
-    val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
-    val formatter = new ChecksumMessageFormatter()
-    val configs: JMap[String, String] = new HashMap()
-
-    formatter.configure(configs)
-    var out = new ByteArrayOutputStream()
-    formatter.writeTo(record, new PrintStream(out))
-    assertEquals("checksum:-1\n", out.toString)
-
-    configs.put("topic", "topic1")
-    formatter.configure(configs)
-    out = new ByteArrayOutputStream()
-    formatter.writeTo(record, new PrintStream(out))
-    assertEquals("topic1:checksum:-1\n", out.toString)
-  }
-
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dd83908..6a632e0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -27,6 +27,8 @@
         or updating the application not to use internal classes.</li>
     <li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
         For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
+    <li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
+        Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
     <li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>core</code> and <code>tools</code> modules:</li>
     <ul>
         <li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
@@ -53,11 +55,15 @@
             instead.</li>
         <li>The Scala <code>kafka.common.MessageFormatter</code> was removed. Please use the Java <code>org.apache.kafka.common.MessageFormatter</code>.</li>
         <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
-        <li>The deprecated <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
+        <li>The <code>checksum()</code> method has been removed from <code>ConsumerRecord</code> and <code>RecordMetadata</code>. The message
+            format v2, which has been the default since 0.11, moved the checksum from the record to the record batch. As such, these methods
+            don't make sense and no replacements exist.</li>
+        <li>The <code>ChecksumMessageFormatter</code> class was removed. It is not part of the public API, but it may have been used
+            with <code>kafka-console-consumer.sh</code>. It reported the checksum of each record, which has not been supported
+            since message format v2.</li>
+        <li>The <code>org.apache.kafka.clients.consumer.internals.PartitionAssignor</code> class has been removed. Please use
             <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> instead.</li>
-        <li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
-            Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
-        <li>The deprecated <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
+        <li>The <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
             Dynamic quota defaults must be used instead.</li>
     </ul>
 </ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 269dff2..20f1449 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.slf4j.Logger;
 
+import java.util.Optional;
+
 import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
 
 class RecordDeserializer {
@@ -59,12 +61,12 @@ class RecordDeserializer {
                 rawRecord.offset(),
                 rawRecord.timestamp(),
                 TimestampType.CREATE_TIME,
-                rawRecord.checksum(),
                 rawRecord.serializedKeySize(),
                 rawRecord.serializedValueSize(),
                 sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
                 sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()),
-                rawRecord.headers()
+                rawRecord.headers(),
+                Optional.empty()
             );
         } catch (final Exception deserializationException) {
             final DeserializationExceptionHandler.DeserializationHandlerResponse response;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 8305d52..1f2e593 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -38,7 +38,6 @@ public final class RecordConverters {
             record.offset(),
             timestamp,
             record.timestampType(),
-            record.checksum(),
             record.serializedKeySize(),
             record.serializedValueSize(),
             record.key(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
index 93ea448..5474f9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -44,9 +47,10 @@ public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest {
                 TimestampType.NO_TIMESTAMP_TYPE,
                 0,
                 0,
-                0,
                 null,
-                null),
+                null,
+                new RecordHeaders(),
+                Optional.empty()),
             0
         );
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
index bd235be..dca9a70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
@@ -17,8 +17,11 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -36,9 +39,10 @@ class TimestampExtractorTest {
                 TimestampType.NO_TIMESTAMP_TYPE,
                 0,
                 0,
-                0,
                 null,
-                null),
+                null,
+                new RecordHeaders(),
+                Optional.empty()),
             0
         );
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 3cc06be..e4bc600 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -40,6 +41,7 @@ import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
@@ -148,7 +150,7 @@ public class GlobalStateTaskTest {
                                   final boolean failExpected) {
         final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
             topic2, 1, 1, 0L, TimestampType.CREATE_TIME,
-            0L, 0, 0, key, recordValue
+            0, 0, key, recordValue, new RecordHeaders(), Optional.empty()
         );
         globalStateTask.initialize();
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 4ae9d32..18d17ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.Test;
 
+import java.util.Optional;
+
 import static org.junit.Assert.assertEquals;
 
 public class RecordDeserializerTest {
@@ -36,14 +38,13 @@ public class RecordDeserializerTest {
         1,
         10,
         TimestampType.LOG_APPEND_TIME,
-        5L,
         3,
         5,
         new byte[0],
         new byte[0],
-        headers);
+        headers,
+        Optional.empty());
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
         final RecordDeserializer recordDeserializer = new RecordDeserializer(
@@ -60,7 +61,6 @@ public class RecordDeserializerTest {
         assertEquals(rawRecord.topic(), record.topic());
         assertEquals(rawRecord.partition(), record.partition());
         assertEquals(rawRecord.offset(), record.offset());
-        assertEquals(rawRecord.checksum(), record.checksum());
         assertEquals("key", record.key());
         assertEquals("value", record.value());
         assertEquals(rawRecord.timestamp(), record.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 895e045..142e85a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -45,6 +46,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -103,9 +105,9 @@ public class RecordQueueTest {
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
 
         queue.addRawRecords(list1);
 
@@ -128,9 +130,9 @@ public class RecordQueueTest {
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
         final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
 
         queue.addRawRecords(list2);
 
@@ -161,9 +163,9 @@ public class RecordQueueTest {
 
         // add three more records with 4, 5, 6
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
 
         queue.addRawRecords(list3);
 
@@ -202,10 +204,14 @@ public class RecordQueueTest {
 
         // add three 3 out-of-order records with timestamp 2, 1, 3, 4
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         queue.addRawRecords(list1);
         assertThat(queue.partitionTime(), is(RecordQueue.UNKNOWN));
@@ -231,10 +237,14 @@ public class RecordQueueTest {
         assertThat(queue.partitionTime(), is(150L));
 
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 200, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 100, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 300, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 400, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 200, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 100, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 300, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 400, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         queue.addRawRecords(list1);
         assertThat(queue.partitionTime(), is(150L));
@@ -253,7 +263,8 @@ public class RecordQueueTest {
     public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
         final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         final StreamsException exception = assertThrows(
             StreamsException.class,
@@ -266,7 +277,8 @@ public class RecordQueueTest {
     public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
         final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
+                new RecordHeaders(), Optional.empty()));
 
         final StreamsException exception = assertThrows(
             StreamsException.class,
@@ -279,7 +291,8 @@ public class RecordQueueTest {
     public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
         final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         queueThatSkipsDeserializeErrors.addRawRecords(records);
         assertEquals(0, queueThatSkipsDeserializeErrors.size());
@@ -289,7 +302,8 @@ public class RecordQueueTest {
     public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
         final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
+                new RecordHeaders(), Optional.empty()));
 
         queueThatSkipsDeserializeErrors.addRawRecords(records);
         assertEquals(0, queueThatSkipsDeserializeErrors.size());
@@ -298,7 +312,8 @@ public class RecordQueueTest {
     @Test
     public void shouldThrowOnNegativeTimestamp() {
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         final RecordQueue queue = new RecordQueue(
             new TopicPartition("topic", 1),
@@ -323,7 +338,8 @@ public class RecordQueueTest {
     @Test
     public void shouldDropOnNegativeTimestamp() {
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
-            new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         final RecordQueue queue = new RecordQueue(
             new TopicPartition("topic", 1),
@@ -355,10 +371,14 @@ public class RecordQueueTest {
 
         // add three 3 out-of-order records with timestamp 2, 1, 3, 4
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
+                new RecordHeaders(), Optional.empty()));
 
         assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3a6a7c5..fa80e46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -81,6 +82,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -2539,11 +2541,12 @@ public class StreamTaskTest {
             offset,
             offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,
-            0L,
             0,
             0,
             recordKey,
-            intSerializer.serialize(null, value)
+            intSerializer.serialize(null, value),
+            new RecordHeaders(),
+            Optional.empty()
         );
     }
 
@@ -2555,11 +2558,12 @@ public class StreamTaskTest {
             offset,
             offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,
-            0L,
             0,
             0,
             recordKey,
-            recordValue
+            recordValue,
+            new RecordHeaders(),
+            Optional.empty()
         );
     }
 
@@ -2570,11 +2574,12 @@ public class StreamTaskTest {
             offset,
             offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,
-            0L,
             0,
             0,
             new IntegerSerializer().serialize(topic1, key),
-            recordValue
+            recordValue,
+            new RecordHeaders(),
+            Optional.empty()
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 82b1996..80fb2c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -1853,11 +1854,12 @@ public class StreamThreadTest {
             100L,
             100L,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
             "K".getBytes().length,
             "V".getBytes().length,
             "K".getBytes(),
-            "V".getBytes()));
+            "V".getBytes(),
+            new RecordHeaders(),
+            Optional.empty()));
 
         thread.runOnce();
 
@@ -1940,11 +1942,12 @@ public class StreamThreadTest {
             110L,
             110L,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
             "K".getBytes().length,
             "V".getBytes().length,
             "K".getBytes(),
-            "V".getBytes()));
+            "V".getBytes(),
+            new RecordHeaders(),
+            Optional.empty()));
 
         thread.runOnce();
 
@@ -2148,22 +2151,24 @@ public class StreamThreadTest {
             ++offset,
             -1,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
             -1,
             -1,
             new byte[0],
-            "I am not an integer.".getBytes()));
+            "I am not an integer.".getBytes(),
+            new RecordHeaders(),
+            Optional.empty()));
         mockConsumer.addRecord(new ConsumerRecord<>(
             t1p1.topic(),
             t1p1.partition(),
             ++offset,
             -1,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
             -1,
             -1,
             new byte[0],
-            "I am not an integer.".getBytes()));
+            "I am not an integer.".getBytes(),
+            new RecordHeaders(),
+            Optional.empty()));
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RecordDeserializer.class)) {
             thread.runOnce();
@@ -2860,11 +2865,12 @@ public class StreamThreadTest {
             offset,
             timestamp,
             TimestampType.CREATE_TIME,
-            ConsumerRecord.NULL_CHECKSUM,
             -1,
             -1,
             new byte[0],
-            new byte[0]));
+            new byte[0],
+            new RecordHeaders(),
+            Optional.empty()));
     }
 
     StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
index a702fdc..4a6fbbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
@@ -17,8 +17,11 @@
 package org.apache.kafka.streams.processor.internals.testutil;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 
+import java.util.Optional;
+
 public final class ConsumerRecordUtil {
     private ConsumerRecordUtil() {}
 
@@ -36,11 +39,12 @@ public final class ConsumerRecordUtil {
             offset,
             0L,
             TimestampType.CREATE_TIME,
-            0L,
             0,
             0,
             key,
-            value
+            value,
+            new RecordHeaders(),
+            Optional.empty()
         );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index bacbacd..e409ca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -17,10 +17,12 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Optional;
 
 import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
 import static org.junit.Assert.assertArrayEquals;
@@ -41,7 +43,8 @@ public class RecordConvertersTest {
         final long timestamp = 10L;
         final byte[] value = new byte[1];
         final ConsumerRecord<byte[], byte[]> inputRecord = new ConsumerRecord<>(
-                "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, new byte[0], value);
+                "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0, 0, new byte[0], value,
+                new RecordHeaders(), Optional.empty());
         final byte[] expectedValue = ByteBuffer.allocate(9).putLong(timestamp).put(value).array();
         final byte[] actualValue = timestampedValueConverter.convert(inputRecord).value();
         assertArrayEquals(expectedValue, actualValue);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 0037fbb..7d420c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -47,6 +47,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -387,9 +388,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  TimestampType.CREATE_TIME,
                                  -1,
                                  -1,
-                                 -1,
                                  "todelete".getBytes(UTF_8),
-                                 hexStringToByteArray(toDeleteBinaryValue)),
+                                 hexStringToByteArray(toDeleteBinaryValue),
+                                 new RecordHeaders(),
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
@@ -397,9 +399,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  TimestampType.CREATE_TIME,
                                  -1,
                                  -1,
-                                 -1,
                                  "asdf".getBytes(UTF_8),
-                                 hexStringToByteArray(asdfBinaryValue)),
+                                 hexStringToByteArray(asdfBinaryValue),
+                                 new RecordHeaders(),
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
@@ -407,9 +410,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  TimestampType.CREATE_TIME,
                                  -1,
                                  -1,
-                                 -1,
                                  "zxcv".getBytes(UTF_8),
-                                 hexStringToByteArray(zxcvBinaryValue1)),
+                                 hexStringToByteArray(zxcvBinaryValue1),
+                                 new RecordHeaders(),
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  3,
@@ -417,9 +421,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  TimestampType.CREATE_TIME,
                                  -1,
                                  -1,
-                                 -1,
                                  "zxcv".getBytes(UTF_8),
-                                 hexStringToByteArray(zxcvBinaryValue2))
+                                 hexStringToByteArray(zxcvBinaryValue2),
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -434,9 +439,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  TimestampType.CREATE_TIME,
                                  -1,
                                  -1,
-                                 -1,
                                  "todelete".getBytes(UTF_8),
-                                 null)
+                                 null,
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(2));
@@ -501,45 +507,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  0,
                                  999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
                                  hexStringToByteArray(toDeleteBinary),
-                                 v1FlagHeaders),
+                                 v1FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
                                  9999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
                                  hexStringToByteArray(asdfBinary),
-                                 v1FlagHeaders),
+                                 v1FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  99,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary1),
-                                 v1FlagHeaders),
+                                 v1FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  3,
                                  100,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary2),
-                                 v1FlagHeaders)
+                                 v1FlagHeaders,
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -552,11 +558,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  3,
                                  3,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 null)
+                                 null,
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(2));
@@ -622,45 +629,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  0,
                                  999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
                                  hexStringToByteArray(toDeleteBinary),
-                                 v2FlagHeaders),
+                                 v2FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
                                  9999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
                                  hexStringToByteArray(asdfBinary),
-                                 v2FlagHeaders),
+                                 v2FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  99,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary1),
-                                 v2FlagHeaders),
+                                 v2FlagHeaders,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  100,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary2),
-                                 v2FlagHeaders)
+                                 v2FlagHeaders,
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -673,11 +680,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  3,
                                  3,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 null)
+                                 null,
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(2));
@@ -745,45 +753,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  0,
                                  999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
                                  hexStringToByteArray(toDeleteBinary),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
                                  9999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
                                  hexStringToByteArray(asdfBinary),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  99,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary1),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  100,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary2),
-                                 headers)
+                                 headers,
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -796,11 +804,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  3,
                                  3,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 null)
+                                 null,
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(2));
@@ -865,45 +874,45 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  0,
                                  999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
                                  hexStringToByteArray(toDeleteBinary),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
                                  9999,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
                                  hexStringToByteArray(asdfBinary),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  99,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary1),
-                                 headers),
+                                 headers,
+                                 Optional.empty()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
                                  100,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
                                  hexStringToByteArray(zxcvBinary2),
-                                 headers)
+                                 headers,
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(3));
@@ -916,11 +925,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  3,
                                  3,
                                  TimestampType.CREATE_TIME,
-                                 -1L,
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 null)
+                                 null,
+                                 new RecordHeaders(),
+                                 Optional.empty())
         ));
 
         assertThat(buffer.numRecords(), is(2));
@@ -979,12 +989,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                      0,
                                      999,
                                      TimestampType.CREATE_TIME,
-                                     -1L,
                                      -1,
                                      -1,
                                      "todelete".getBytes(UTF_8),
                                      ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
-                                     unknownFlagHeaders)
+                                     unknownFlagHeaders,
+                                     Optional.empty())
             ));
             fail("expected an exception");
         } catch (final IllegalArgumentException expected) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 244b35f..2398872 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 
 public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
     private final Serializer<K> keySerializer;
@@ -60,10 +61,10 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
     public void bufferRecord(final ConsumerRecord<K, V> record) {
         recordBuffer.add(
             new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(),
-                                 record.timestampType(), 0L, 0, 0,
+                                 record.timestampType(), 0, 0,
                                  keySerializer.serialize(record.topic(), record.headers(), record.key()),
                                  valueSerializer.serialize(record.topic(), record.headers(), record.value()),
-                                 record.headers()));
+                                 record.headers(), Optional.empty()));
         endOffset = record.offset();
 
         super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f657209..bd93367 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -101,6 +101,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Queue;
 import java.util.Set;
@@ -606,12 +607,12 @@ public class TopologyTestDriver implements Closeable {
             offset,
             timestamp,
             TimestampType.CREATE_TIME,
-            (long) ConsumerRecord.NULL_CHECKSUM,
             key == null ? ConsumerRecord.NULL_SIZE : key.length,
             value == null ? ConsumerRecord.NULL_SIZE : value.length,
             key,
             value,
-            headers))
+            headers,
+            Optional.empty()))
         );
     }
 
@@ -663,12 +664,12 @@ public class TopologyTestDriver implements Closeable {
             offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1,
             timestamp,
             TimestampType.CREATE_TIME,
-            (long) ConsumerRecord.NULL_CHECKSUM,
             key == null ? ConsumerRecord.NULL_SIZE : key.length,
             value == null ? ConsumerRecord.NULL_SIZE : value.length,
             key,
             value,
-            headers)
+            headers,
+            Optional.empty())
         );
         globalStateTask.flushState();
     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
index ae71de8..15164f6 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Factory to create {@link ConsumerRecord consumer records} for a single single-partitioned topic with given key and
@@ -190,12 +191,12 @@ public class ConsumerRecordFactory<K, V> {
             -1L,
             timestampMs,
             TimestampType.CREATE_TIME,
-            (long) ConsumerRecord.NULL_CHECKSUM,
             serializedKey == null ? 0 : serializedKey.length,
             serializedValue == null ? 0 : serializedValue.length,
             serializedKey,
             serializedValue,
-            headers);
+            headers,
+            Optional.empty());
     }
 
     /**
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
index b16ea03..ad3b1a2 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -152,8 +153,8 @@ public class TestRecordTest {
     @Test
     public void testConsumerRecord() {
         final String topicName = "topic";
-        final ConsumerRecord<String, Integer> consumerRecord =
-            new ConsumerRecord<>(topicName, 1, 0, recordMs, TimestampType.CREATE_TIME, 0L, 0, 0, key, value, headers);
+        final ConsumerRecord<String, Integer> consumerRecord = new ConsumerRecord<>(topicName, 1, 0, recordMs,
+            TimestampType.CREATE_TIME, 0, 0, key, value, headers, Optional.empty());
         final TestRecord<String, Integer> testRecord = new TestRecord<>(consumerRecord);
         final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
         assertEquals(expectedRecord, testRecord);