You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/02 18:40:40 UTC

kafka git commit: KAFKA-3196; Added checksum and size to RecordMetadata and ConsumerRecord

Repository: kafka
Updated Branches:
  refs/heads/trunk 23f239b06 -> 002b377da


KAFKA-3196; Added checksum and size to RecordMetadata and ConsumerRecord

This is the second (remaining) part of KIP-42. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

Author: Anna Povzner <an...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>

Closes #951 from apovzner/kafka-3196


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/002b377d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/002b377d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/002b377d

Branch: refs/heads/trunk
Commit: 002b377dad9c956cd0ae0597981f29698883b6d5
Parents: 23f239b
Author: Anna Povzner <an...@confluent.io>
Authored: Wed Mar 2 09:40:34 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 2 09:40:34 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/ConsumerInterceptor.java   |  5 ++-
 .../kafka/clients/consumer/ConsumerRecord.java  | 37 ++++++++++++++++++-
 .../clients/consumer/internals/Fetcher.java     | 14 +++++---
 .../kafka/clients/producer/MockProducer.java    |  6 ++--
 .../kafka/clients/producer/RecordMetadata.java  | 38 ++++++++++++++++++--
 .../internals/FutureRecordMetadata.java         | 24 +++++++++++--
 .../clients/producer/internals/RecordBatch.java | 12 +++++--
 .../apache/kafka/common/record/Compressor.java  | 16 ++++++---
 .../kafka/common/record/MemoryRecords.java      |  6 ++--
 .../clients/consumer/MockConsumerTest.java      |  4 +--
 .../internals/ConsumerInterceptorsTest.java     |  8 +++--
 .../kafka/clients/producer/RecordSendTest.java  |  9 +++--
 .../internals/ProducerInterceptorsTest.java     |  2 +-
 .../kafka/test/MockConsumerInterceptor.java     |  6 ++--
 .../connect/runtime/WorkerSinkTaskTest.java     |  2 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  4 +--
 .../connect/runtime/WorkerSourceTaskTest.java   |  3 +-
 .../connect/storage/KafkaConfigStorageTest.java | 24 ++++++-------
 .../storage/KafkaOffsetBackingStoreTest.java    | 16 ++++-----
 .../storage/KafkaStatusBackingStoreTest.java    |  2 +-
 .../kafka/connect/util/KafkaBasedLogTest.java   | 16 ++++-----
 .../scala/kafka/tools/ConsoleConsumer.scala     |  4 +--
 .../scala/kafka/tools/SimpleConsumerShell.scala |  4 ++-
 .../kafka/api/BaseConsumerTest.scala            |  3 ++
 .../kafka/api/BaseProducerSendTest.scala        |  7 ++++
 .../processor/internals/RecordQueue.java        |  7 ++--
 .../processor/internals/PartitionGroupTest.java | 12 +++----
 .../internals/ProcessorStateManagerTest.java    |  6 ++--
 .../processor/internals/RecordQueueTest.java    | 18 +++++-----
 .../processor/internals/StandbyTaskTest.java    | 18 +++++-----
 .../processor/internals/StreamTaskTest.java     | 30 ++++++++--------
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 32 files changed, 251 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 5c13a41..70ea444 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -38,7 +38,10 @@ public interface ConsumerInterceptor<K, V> extends Configurable {
     /**
      * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}
      * <p>
-     * This method is allowed to modify consumer records, in which case the new records will be returned.
+     * This method is allowed to modify consumer records, in which case the new records will be
+     * returned. There is no limitation on number of records that could be returned from this
+     * method. I.e., the interceptor can filter the records or generate new records.
+     * <p>
      * Any exception thrown by this method will be caught by the caller, logged, but not propagated to the client.
      * <p>
      * Since the consumer may run multiple interceptors, a particular interceptor's onConsume() callback will be called

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 42e0a90..4165534 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -24,6 +24,9 @@ public final class ConsumerRecord<K, V> {
     private final long offset;
     private final long timestamp;
     private final TimestampType timestampType;
+    private final long checksum;
+    private final int serializedKeySize;
+    private final int serializedValueSize;
     private final K key;
     private final V value;
 
@@ -43,6 +46,9 @@ public final class ConsumerRecord<K, V> {
                           long offset,
                           long timestamp,
                           TimestampType timestampType,
+                          long checksum,
+                          int serializedKeySize,
+                          int serializedValueSize,
                           K key,
                           V value) {
         if (topic == null)
@@ -52,6 +58,9 @@ public final class ConsumerRecord<K, V> {
         this.offset = offset;
         this.timestamp = timestamp;
         this.timestampType = timestampType;
+        this.checksum = checksum;
+        this.serializedKeySize = serializedKeySize;
+        this.serializedValueSize = serializedValueSize;
         this.key = key;
         this.value = value;
     }
@@ -105,9 +114,35 @@ public final class ConsumerRecord<K, V> {
         return timestampType;
     }
 
+    /**
+     * The checksum (CRC32) of the record.
+     */
+    public long checksum() {
+        return this.checksum;
+    }
+
+    /**
+     * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
+     * is -1.
+     */
+    public int serializedKeySize() {
+        return this.serializedKeySize;
+    }
+
+    /**
+     * The size of the serialized, uncompressed value in bytes. If value is null, the
+     * returned size is -1.
+     */
+    public int serializedValueSize() {
+        return this.serializedValueSize;
+    }
+
     @Override
     public String toString() {
         return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
-                + ", " + timestampType + " = " + timestamp + ", key = " + key + ", value = " + value + ")";
+               + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
+               + ", serialized key size = "  + serializedKeySize
+               + ", serialized value size = " + serializedValueSize
+               + ", key = " + key + ", value = " + value + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5d92a76..e2a5548 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -644,11 +644,17 @@ public class Fetcher<K, V> {
             long timestamp = logEntry.record().timestamp();
             TimestampType timestampType = logEntry.record().timestampType();
             ByteBuffer keyBytes = logEntry.record().key();
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
             ByteBuffer valueBytes = logEntry.record().value();
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
-
-            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, key, value);
+            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
+
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+                                        timestamp, timestampType, logEntry.record().checksum(),
+                                        keyByteArray == null ? -1 : keyByteArray.length,
+                                        valueByteArray == null ? -1 : valueByteArray.length,
+                                        key, value);
         } catch (KafkaException e) {
             throw e;
         } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 5f97bae..109b0ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -117,10 +117,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
         if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partition(record, this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
-        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP, 0, 0, 0);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP), result, callback);
+        Completion completion = new Completion(topicPartition, offset,
+                                               new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP, 0, 0, 0),
+                                               result, callback);
         this.sent.add(record);
         if (autoComplete)
             completion.complete(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index d9ea239..c60a53d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -30,19 +30,28 @@ public final class RecordMetadata {
     // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
     // producer.
     private final long timestamp;
+    private final long checksum;
+    private final int serializedKeySize;
+    private final int serializedValueSize;
     private final TopicPartition topicPartition;
 
-    private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp) {
+    private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long
+        checksum, int serializedKeySize, int serializedValueSize) {
         super();
         this.offset = offset;
         this.timestamp = timestamp;
+        this.checksum = checksum;
+        this.serializedKeySize = serializedKeySize;
+        this.serializedValueSize = serializedValueSize;
         this.topicPartition = topicPartition;
     }
 
-    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp) {
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,
+                          long timestamp, long checksum, int serializedKeySize, int serializedValueSize) {
         // ignore the relativeOffset if the base offset is -1,
         // since this indicates the offset is unknown
-        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, timestamp);
+        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset,
+             timestamp, checksum, serializedKeySize, serializedValueSize);
     }
 
     /**
@@ -60,6 +69,29 @@ public final class RecordMetadata {
     }
 
     /**
+     * The checksum (CRC32) of the record.
+     */
+    public long checksum() {
+        return this.checksum;
+    }
+
+    /**
+     * The size of the serialized, uncompressed key in bytes. If key is null, the returned size
+     * is -1.
+     */
+    public int serializedKeySize() {
+        return this.serializedKeySize;
+    }
+
+    /**
+     * The size of the serialized, uncompressed value in bytes. If value is null, the returned
+     * size is -1.
+     */
+    public int serializedValueSize() {
+        return this.serializedValueSize;
+    }
+
+    /**
      * The topic the record was appended to
      */
     public String topic() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index a140371..d5995a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -27,11 +27,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     private final ProduceRequestResult result;
     private final long relativeOffset;
     private final long timestamp;
+    private final long checksum;
+    private final int serializedKeySize;
+    private final int serializedValueSize;
 
-    public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp) {
+    public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp,
+                                long checksum, int serializedKeySize, int serializedValueSize) {
         this.result = result;
         this.relativeOffset = relativeOffset;
         this.timestamp = timestamp;
+        this.checksum = checksum;
+        this.serializedKeySize = serializedKeySize;
+        this.serializedValueSize = serializedValueSize;
     }
 
     @Override
@@ -61,7 +68,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     }
     
     RecordMetadata value() {
-        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, this.timestamp);
+        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
+                                  this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);
     }
     
     public long relativeOffset() {
@@ -72,6 +80,18 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
         return this.timestamp;
     }
 
+    public long checksum() {
+        return this.checksum;
+    }
+
+    public int serializedKeySize() {
+        return this.serializedKeySize;
+    }
+
+    public int serializedValueSize() {
+        return this.serializedValueSize;
+    }
+
     @Override
     public boolean isCancelled() {
         return false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index af9095d..7b5fbbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -67,10 +67,13 @@ public final class RecordBatch {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
-            this.records.append(offsetCounter++, timestamp, key, value);
+            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             this.lastAppendTime = now;
-            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp);
+            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
+                                                                   timestamp, checksum,
+                                                                   key == null ? -1 : key.length,
+                                                                   value == null ? -1 : value.length);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
             this.recordCount++;
@@ -97,7 +100,10 @@ public final class RecordBatch {
                 if (exception == null) {
                     // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
                     RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
-                        timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp);
+                                                                 timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
+                                                                 thunk.future.checksum(),
+                                                                 thunk.future.serializedKeySize(),
+                                                                 thunk.future.serializedValueSize());
                     thunk.callback.onCompletion(metadata, null);
                 } else {
                     thunk.callback.onCompletion(null, exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index cde2178..afa85a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -201,16 +201,24 @@ public class Compressor {
         }
     }
 
-    public void putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+    /**
+     * @return CRC of the record
+     */
+    public long putRecord(long timestamp, byte[] key, byte[] value, CompressionType type,
+                          int valueOffset, int valueSize) {
         // put a record as un-compressed into the underlying stream
         long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
         byte attributes = Record.computeAttributes(type);
         putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
-
+        return crc;
     }
 
-    public void putRecord(long timestamp, byte[] key, byte[] value) {
-        putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
+    /**
+     * Put a record as uncompressed into the underlying stream
+     * @return CRC of the record
+     */
+    public long putRecord(long timestamp, byte[] key, byte[] value) {
+        return putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
     }
 
     private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 01da1e2..f37ef39 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -88,16 +88,18 @@ public class MemoryRecords implements Records {
 
     /**
      * Append a new record and offset to the buffer
+     * @return crc of the record
      */
-    public void append(long offset, long timestamp, byte[] key, byte[] value) {
+    public long append(long offset, long timestamp, byte[] key, byte[] value) {
         if (!writable)
             throw new IllegalStateException("Memory records is not writable");
 
         int size = Record.recordSize(key, value);
         compressor.putLong(offset);
         compressor.putInt(size);
-        compressor.putRecord(timestamp, key, value);
+        long crc = compressor.putRecord(timestamp, key, value);
         compressor.recordWritten(size + Records.LOG_OVERHEAD);
+        return crc;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 3ef5c8b..70b1c09 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -43,8 +43,8 @@ public class MockConsumerTest {
         beginningOffsets.put(new TopicPartition("test", 1), 0L);
         consumer.updateBeginningOffsets(beginningOffsets);
         consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, "key1", "value1");
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, "key2", "value2");
+        ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
+        ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
         consumer.addRecord(rec1);
         consumer.addRecord(rec2);
         ConsumerRecords<String, String> recs = consumer.poll(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 25843c7..4259c75 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -44,7 +44,7 @@ public class ConsumerInterceptorsTest {
     private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
     private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
     private final ConsumerRecord<Integer, Integer> consumerRecord =
-        new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 1, 1);
+        new ConsumerRecord<>(topic, partition, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1);
     private int onCommitCount = 0;
     private int onConsumeCount = 0;
 
@@ -117,9 +117,11 @@ public class ConsumerInterceptorsTest {
         List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
         list1.add(consumerRecord);
         List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
-        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
+        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0,
+                                       0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
         List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
-        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 0L, TimestampType.CREATE_TIME, 1, 1));
+        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0,
+                                       0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 1));
         records.put(tp, list1);
         records.put(filterTopicPart1, list2);
         records.put(filterTopicPart2, list3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 5591129..d820dab 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -45,7 +45,8 @@ public class RecordSendTest {
     @Test
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult();
-        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, Record.NO_TIMESTAMP);
+        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
+                                                               Record.NO_TIMESTAMP, 0, 0, 0);
         assertFalse("Request is not completed", future.isDone());
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -63,7 +64,8 @@ public class RecordSendTest {
      */
     @Test(expected = ExecutionException.class)
     public void testError() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset, Record.NO_TIMESTAMP);
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
+                                                               relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
         future.get();
     }
 
@@ -72,7 +74,8 @@ public class RecordSendTest {
      */
     @Test
     public void testBlocking() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset, Record.NO_TIMESTAMP);
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
+                                                               relOffset, Record.NO_TIMESTAMP, 0, 0, 0);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 26d15d0..5a32dda 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -128,7 +128,7 @@ public class ProducerInterceptorsTest {
         ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
 
         // verify onAck is called on all interceptors
-        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0);
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0);
         interceptors.onAcknowledgement(meta, null);
         assertEquals(2, onAckCount);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 3246578..0c187cb 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.record.TimestampType;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -57,7 +56,10 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
             List<ConsumerRecord<String, String>> lst = new ArrayList<>();
             for (ConsumerRecord<String, String> record: records.records(tp)) {
                 lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
-                    0L, TimestampType.CREATE_TIME, record.key(), record.value().toUpperCase()));
+                                             record.timestamp(), record.timestampType(),
+                                             record.checksum(), record.serializedKeySize(),
+                                             record.serializedValueSize(),
+                                             record.key(), record.value().toUpperCase()));
             }
             recordMap.put(tp, lst);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 6721609..aef3344 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -293,7 +293,7 @@ public class WorkerSinkTaskTest {
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                         List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
                         for (int i = 0; i < numMessages; i++)
-                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE));
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE));
                         recordsReturned += numMessages;
                         return new ConsumerRecords<>(
                                 numMessages > 0 ?

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 77f1ed0..b37b34f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -520,7 +520,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;
@@ -548,7 +548,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                                 Collections.singletonMap(
                                         new TopicPartition(TOPIC, PARTITION),
                                         Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, RAW_KEY, RAW_VALUE)
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                         )));
                         recordsReturned++;
                         return records;

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8e9eb72..8fb8bb5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -408,7 +408,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             public Future<RecordMetadata> answer() throws Throwable {
                 synchronized (producerCallbacks) {
                     for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
-                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L), null);
+                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
+                                                           0L, 0L, 0, 0), null);
                     }
                     producerCallbacks.reset();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index e878e12..cfc713f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -289,14 +289,14 @@ public class KafkaConfigStorageTest {
         expectConfigure();
         // Overwrite each type at least once to ensure we see the latest data after loading
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
                 // Connector after root update should make it through, task update shouldn't
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
@@ -343,12 +343,12 @@ public class KafkaConfigStorageTest {
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
                 // This is the record that has been compacted:
                 //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
@@ -484,7 +484,7 @@ public class KafkaConfigStorageTest {
                     public Future<Void> answer() throws Throwable {
                         TestFuture<Void> future = new TestFuture<Void>();
                         for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
-                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, entry.getKey(), entry.getValue()));
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
                         future.resolveOnGet((Void) null);
                         return future;
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 61763a8..22bb376 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -126,10 +126,10 @@ public class KafkaOffsetBackingStoreTest {
     public void testReloadOnStart() throws Exception {
         expectConfigure();
         expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array())
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
         ));
         expectStop();
 
@@ -177,8 +177,8 @@ public class KafkaOffsetBackingStoreTest {
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()));
                 secondGetReadToEndCallback.getValue().onCompletion(null, null);
                 return null;
             }
@@ -190,8 +190,8 @@ public class KafkaOffsetBackingStoreTest {
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
                 thirdGetReadToEndCallback.getValue().onCompletion(null, null);
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
index cdbab64..8acd31f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -367,7 +367,7 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
 
     private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
         return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
-                TimestampType.CREATE_TIME, key, value);
+                TimestampType.CREATE_TIME, 0L, 0, 0, key, value);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index b2246f5..ec58cb6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -183,7 +183,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
                     }
                 });
                 consumer.scheduleNopPollTask();
@@ -191,7 +191,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
                     }
                 });
                 consumer.schedulePollTask(new Runnable() {
@@ -298,16 +298,16 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE));
                     }
                 });
 
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, TP1_KEY, TP1_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY, TP1_VALUE_NEW));
                     }
                 });
 
@@ -363,8 +363,8 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 855025e..50add72 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -127,8 +127,8 @@ object ConsoleConsumer extends Logging {
       }
       messageCount += 1
       try {
-        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.timestampType,
-          msg.key, msg.value), System.out)
+        formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
+                                             msg.timestampType, 0, 0, 0, msg.key, msg.value), System.out)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4b68e0..6ad68b6 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -223,8 +223,10 @@ object SimpleConsumerShell extends Logging {
                 val message = messageAndOffset.message
                 val key = if (message.hasKey) Utils.readBytes(message.key) else null
                 val value = if (message.isNull) null else Utils.readBytes(message.payload)
+                val serializedKeySize = if (message.hasKey) key.size else -1
+                val serializedValueSize = if (message.isNull) -1 else value.size
                 formatter.writeTo(new ConsumerRecord(topic, partitionId, offset, message.timestamp,
-                  message.timestampType, key, value), System.out)
+                  message.timestampType, message.checksum, serializedKeySize, serializedValueSize, key, value), System.out)
                 numMessagesConsumed += 1
               } catch {
                 case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 19a8882..684b38f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -309,6 +309,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
       val keyAndValueIndex = startingKeyAndValueIndex + i
       assertEquals(s"key $keyAndValueIndex", new String(record.key))
       assertEquals(s"value $keyAndValueIndex", new String(record.value))
+      // this is true only because K and V are byte arrays
+      assertEquals(s"key $keyAndValueIndex".length, record.serializedKeySize)
+      assertEquals(s"value $keyAndValueIndex".length, record.serializedValueSize)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 807b8bb..2d89bf8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -96,6 +96,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           assertEquals(offset, metadata.offset())
           assertEquals(topic, metadata.topic())
           assertEquals(partition, metadata.partition())
+          offset match {
+            case 0 => assertEquals(metadata.serializedKeySize + metadata.serializedValueSize, "key".getBytes.length + "value".getBytes.length)
+            case 1 => assertEquals(metadata.serializedKeySize(), "key".getBytes.length)
+            case 2 => assertEquals(metadata.serializedValueSize, "value".getBytes.length)
+            case _ => assertTrue(metadata.serializedValueSize > 0)
+          }
+          assertNotEquals(metadata.checksum(), 0)
           offset += 1
         } else {
           fail("Send callback returns the following exception", exception)

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 62bf307..6911a45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -77,8 +77,11 @@ public class RecordQueue {
             Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
             Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
 
-            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(),
-                rawRecord.offset(), rawRecord.timestamp(), TimestampType.CREATE_TIME, key, value);
+            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
+                                                                         rawRecord.timestamp(), TimestampType.CREATE_TIME,
+                                                                         rawRecord.checksum(),
+                                                                         rawRecord.serializedKeySize(),
+                                                                         rawRecord.serializedValueSize(), key, value);
             long timestamp = timestampExtractor.extract(record);
 
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 61f6dbf..5bf1b5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -60,17 +60,17 @@ public class PartitionGroupTest {
 
         // add three 3 records with timestamp 1, 3, 5 to partition-1
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         group.addRawRecords(partition1, list1);
 
         // add three 3 records with timestamp 2, 4, 6 to partition-2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         group.addRawRecords(partition2, list2);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 916079d..14cb493 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -85,7 +85,7 @@ public class ProcessorStateManagerTest {
         public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
             recordBuffer.add(
                 new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L,
-                    TimestampType.CREATE_TIME,
+                    TimestampType.CREATE_TIME, 0L, 0, 0,
                     serializer.serialize(record.topic(), record.key()),
                     serializer.serialize(record.topic(), record.value())));
             endOffset = record.offset();
@@ -269,7 +269,7 @@ public class ProcessorStateManagerTest {
                     int key = i * 10;
                     expectedKeys.add(key);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
+                            new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
                     );
                 }
 
@@ -322,7 +322,7 @@ public class ProcessorStateManagerTest {
                     int key = i;
                     expectedKeys.add(i);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
+                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
                     );
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 36f38e6..8d9c91c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -51,9 +51,9 @@ public class RecordQueueTest {
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         queue.addRawRecords(list1, timestampExtractor);
 
@@ -73,9 +73,9 @@ public class RecordQueueTest {
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         queue.addRawRecords(list2, timestampExtractor);
 
@@ -100,9 +100,9 @@ public class RecordQueueTest {
 
         // add three more records with 4, 5, 6
         List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         queue.addRawRecords(list3, timestampExtractor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e0be587..295f0dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -153,7 +153,7 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             task.update(partition1,
-                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue))
+                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
             );
 
         } finally {
@@ -172,9 +172,9 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100))) {
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
                 restoreStateConsumer.bufferRecord(record);
             }
 
@@ -235,11 +235,11 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 4, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 5, 100))) {
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
                 restoreStateConsumer.bufferRecord(record);
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0430566..1f401db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -108,15 +108,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             assertEquals(5, task.process());
@@ -159,15 +159,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             assertEquals(5, task.process());
@@ -178,9 +178,9 @@ public class StreamTaskTest {
             assertTrue(consumer.paused().contains(partition2));
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             assertEquals(2, consumer.paused().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/002b377d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index e414d80..34fd10c 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -201,7 +201,7 @@ public class ProcessorTopologyTestDriver {
         }
         // Add the record ...
         long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
-        task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, key, value)));
+        task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
         producer.clear();
         // Process the record ...
         task.process();