You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/25 07:21:10 UTC

kafka git commit: KAFKA-4935; Deprecate client checksum API and compute lazy partial checksum for magic v2

Repository: kafka
Updated Branches:
  refs/heads/trunk fdcee8b8b -> cea319a4a


KAFKA-4935; Deprecate client checksum API and compute lazy partial checksum for magic v2

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3123 from hachikuji/KAFKA-4935


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cea319a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cea319a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cea319a4

Branch: refs/heads/trunk
Commit: cea319a4ad9c55d3d3263cf7a4224c25772d0e11
Parents: fdcee8b
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu May 25 07:41:51 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 25 08:21:01 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerRecord.java  | 23 ++++--
 .../clients/consumer/internals/Fetcher.java     |  2 +-
 .../kafka/clients/producer/KafkaProducer.java   |  4 +-
 .../kafka/clients/producer/MockProducer.java    |  7 +-
 .../kafka/clients/producer/RecordMetadata.java  | 43 +++++++----
 .../internals/FutureRecordMetadata.java         | 10 +--
 .../producer/internals/ProducerBatch.java       |  4 +-
 .../internals/ProducerInterceptors.java         |  6 +-
 .../common/header/internals/RecordHeaders.java  |  6 +-
 .../record/AbstractLegacyRecordBatch.java       |  5 ++
 .../kafka/common/record/DefaultRecord.java      | 67 +++++++----------
 .../kafka/common/record/DefaultRecordBatch.java |  3 +-
 .../common/record/MemoryRecordsBuilder.java     | 71 ++++++++++--------
 .../org/apache/kafka/common/record/Record.java  |  5 +-
 .../clients/consumer/ConsumerRecordTest.java    | 12 +++
 .../clients/producer/RecordMetadataTest.java    | 79 ++++++++++++++++++++
 .../kafka/clients/producer/RecordSendTest.java  |  6 +-
 .../producer/internals/ProducerBatchTest.java   | 29 +++++++
 .../internals/ProducerInterceptorsTest.java     |  2 +-
 .../internals/RecordAccumulatorTest.java        |  8 +-
 .../common/record/MemoryRecordsBuilderTest.java | 16 ++++
 .../kafka/test/MockConsumerInterceptor.java     |  3 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |  6 +-
 docs/upgrade.html                               |  2 +-
 24 files changed, 285 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 464091a..7f85246 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 
@@ -36,13 +37,14 @@ public class ConsumerRecord<K, V> {
     private final long offset;
     private final long timestamp;
     private final TimestampType timestampType;
-    private final long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
     private final Headers headers;
     private final K key;
     private final V value;
 
+    private volatile Long checksum;
+
     /**
      * Creates a record to be received from a specified topic and partition (provided for
      * compatibility with Kafka 0.9 before the message format supported timestamps and before
@@ -63,7 +65,6 @@ public class ConsumerRecord<K, V> {
                 NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
     }
 
-
     /**
      * Creates a record to be received from a specified topic and partition (provided for
      * compatibility with Kafka 0.10 before the message format supported headers).
@@ -89,7 +90,8 @@ public class ConsumerRecord<K, V> {
                           int serializedValueSize,
                           K key,
                           V value) {
-        this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, value, new RecordHeaders());
+        this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
+                key, value, new RecordHeaders());
     }
 
     /**
@@ -112,7 +114,7 @@ public class ConsumerRecord<K, V> {
                           long offset,
                           long timestamp,
                           TimestampType timestampType,
-                          long checksum,
+                          Long checksum,
                           int serializedKeySize,
                           int serializedValueSize,
                           K key,
@@ -191,8 +193,19 @@ public class ConsumerRecord<K, V> {
 
     /**
      * The checksum (CRC32) of the record.
+     *
+     * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
+     *             checksum returned by the broker may not match what was computed by the producer.
+     *             It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
+     *             message format v2 does not include a record-level checksum (for performance, the record checksum
+     *             was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
+     *             the record timestamp, serialized key size, and serialized value size is returned instead, but
+     *             this should not be depended on for end-to-end reliability.
      */
+    @Deprecated
     public long checksum() {
+        if (checksum == null)
+            this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
         return this.checksum;
     }
 
@@ -215,7 +228,7 @@ public class ConsumerRecord<K, V> {
     @Override
     public String toString() {
         return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
-               + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
+               + ", " + timestampType + " = " + timestamp
                + ", serialized key size = "  + serializedKeySize
                + ", serialized value size = " + serializedValueSize
                + ", headers = " + headers

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6917a1d..a79ea5d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -917,7 +917,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
-                                        timestamp, timestampType, record.checksum(),
+                                        timestamp, timestampType, record.checksumOrNull(),
                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                         key, value, headers);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c11ecc7..22baf3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1014,8 +1014,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (this.interceptors != null) {
                 if (metadata == null) {
-                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
-                                                        exception);
+                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
+                                    Long.valueOf(-1L), -1, -1), exception);
                 } else {
                     this.interceptors.onAcknowledgement(metadata, exception);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 22fa755..566e43a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -251,11 +251,10 @@ public class MockProducer<K, V> implements Producer<K, V> {
             partition = partition(record, this.cluster);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         ProduceRequestResult result = new ProduceRequestResult(topicPartition);
-        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(offset,
-                                               new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
-                                               result, callback);
+        Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
+                RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback);
 
         if (!this.transactionInFlight)
             this.sent.add(record);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 2d06ea8..6757a6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.DefaultRecord;
 
 /**
  * The metadata for a record that has been acknowledged by the server
@@ -36,15 +36,17 @@ public final class RecordMetadata {
     // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
     // producer.
     private final long timestamp;
-    private final long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
     private final TopicPartition topicPartition;
 
-    private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long
-        checksum, int serializedKeySize, int serializedValueSize) {
-        super();
-        this.offset = offset;
+    private volatile Long checksum;
+
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
+                          Long checksum, int serializedKeySize, int serializedValueSize) {
+        // ignore the relativeOffset if the base offset is -1,
+        // since this indicates the offset is unknown
+        this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
         this.timestamp = timestamp;
         this.checksum = checksum;
         this.serializedKeySize = serializedKeySize;
@@ -52,17 +54,14 @@ public final class RecordMetadata {
         this.topicPartition = topicPartition;
     }
 
+    /**
+     * @deprecated As of 0.11.0. Use @{@link RecordMetadata#RecordMetadata(TopicPartition, long, long, long, Long, int, int)}.
+     */
     @Deprecated
-    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
-        this(topicPartition, baseOffset, relativeOffset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
-    }
-
-    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,
-                          long timestamp, long checksum, int serializedKeySize, int serializedValueSize) {
-        // ignore the relativeOffset if the base offset is -1,
-        // since this indicates the offset is unknown
-        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset,
-             timestamp, checksum, serializedKeySize, serializedValueSize);
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
+                          long checksum, int serializedKeySize, int serializedValueSize) {
+        this(topicPartition, baseOffset, relativeOffset, timestamp, Long.valueOf(checksum), serializedKeySize,
+                serializedValueSize);
     }
 
     /**
@@ -81,8 +80,20 @@ public final class RecordMetadata {
 
     /**
      * The checksum (CRC32) of the record.
+     *
+     * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
+     *             computed checksum may not match what was stored on the broker, or what will be returned to the consumer.
+     *             It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
+     *             message format v2 does not include a record-level checksum (for performance, the record checksum
+     *             was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
+     *             the record timestamp, serialized key size, and serialized value size is returned instead, but
+     *             this should not be depended on for end-to-end reliability.
      */
+    @Deprecated
     public long checksum() {
+        if (checksum == null)
+            // The checksum is null only for message format v2 and above, which do not have a record-level checksum.
+            this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
         return this.checksum;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 1de965f..8fcc46f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -31,13 +31,13 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     private final ProduceRequestResult result;
     private final long relativeOffset;
     private final long createTimestamp;
-    private final long checksum;
+    private final Long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
     private volatile FutureRecordMetadata nextRecordMetadata = null;
 
     public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
-                                long checksum, int serializedKeySize, int serializedValueSize) {
+                                Long checksum, int serializedKeySize, int serializedValueSize) {
         this.result = result;
         this.relativeOffset = relativeOffset;
         this.createTimestamp = createTimestamp;
@@ -96,14 +96,10 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
             return value();
     }
 
-    long checksum() {
+    Long checksumOrNull() {
         return this.checksum;
     }
 
-    long relativeOffset() {
-        return this.relativeOffset;
-    }
-
     RecordMetadata value() {
         if (nextRecordMetadata != null)
             return nextRecordMetadata.value();

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index cdf85ce..df79707 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -103,7 +103,7 @@ public final class ProducerBatch {
         if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
             return null;
         } else {
-            long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
+            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
             this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
@@ -131,7 +131,7 @@ public final class ProducerBatch {
             this.maxRecordSize = Math.max(this.maxRecordSize,
                                           AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
-                                                                   timestamp, thunk.future.checksum(),
+                                                                   timestamp, thunk.future.checksumOrNull(),
                                                                    key == null ? -1 : key.remaining(),
                                                                    value == null ? -1 : value.remaining());
             // Chain the future to the original thunk.

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index e4ab4c6..61a8b7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -111,10 +111,10 @@ public class ProducerInterceptors<K, V> implements Closeable {
                 } else {
                     if (interceptTopicPartition == null) {
                         interceptTopicPartition = new TopicPartition(record.topic(),
-                                                                     record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
+                                record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                     }
-                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
-                                                  exception);
+                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
+                                    RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
                 }
             } catch (Exception e) {
                 // do not propagate interceptor exceptions, just log

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index f23d799..afd991f 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -54,10 +54,8 @@ public class RecordHeaders implements Headers {
             this.headers = new ArrayList<>((Collection<Header>) headers);
         } else {
             this.headers = new ArrayList<>();
-            Iterator<Header> iterator = headers.iterator();
-            while (iterator.hasNext()) {
-                this.headers.add(iterator.next());
-            }
+            for (Header header : headers)
+                this.headers.add(header);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 7be4bdd..6ce3ba3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -110,6 +110,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
+    public Long checksumOrNull() {
+        return checksum();
+    }
+
+    @Override
     public long checksum() {
         return outerRecord().checksum();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 37f92d2..9d0cd7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -77,7 +77,6 @@ public class DefaultRecord implements Record {
     private final ByteBuffer key;
     private final ByteBuffer value;
     private final Header[] headers;
-    private Long checksum = null;
 
     private DefaultRecord(int sizeInBytes,
                           byte attributes,
@@ -122,10 +121,8 @@ public class DefaultRecord implements Record {
     }
 
     @Override
-    public long checksum() {
-        if (checksum == null)
-            checksum = computeChecksum(timestamp, key, value);
-        return checksum;
+    public Long checksumOrNull() {
+        return null;
     }
 
     @Override
@@ -174,14 +171,14 @@ public class DefaultRecord implements Record {
     }
 
     /**
-     * Write the record to `out` and return its crc.
+     * Write the record to `out` and return its size.
      */
-    public static long writeTo(DataOutputStream out,
-                               int offsetDelta,
-                               long timestampDelta,
-                               ByteBuffer key,
-                               ByteBuffer value,
-                               Header[] headers) throws IOException {
+    public static int writeTo(DataOutputStream out,
+                              int offsetDelta,
+                              long timestampDelta,
+                              ByteBuffer key,
+                              ByteBuffer value,
+                              Header[] headers) throws IOException {
         int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
         ByteUtils.writeVarint(sizeInBytes, out);
 
@@ -230,18 +227,18 @@ public class DefaultRecord implements Record {
             }
         }
 
-        return computeChecksum(timestampDelta, key, value);
+        return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
 
     /**
-     * Write the record to `out` and return its crc.
+     * Write the record to `out` and return its size.
      */
-    public static long writeTo(ByteBuffer out,
-                               int offsetDelta,
-                               long timestampDelta,
-                               ByteBuffer key,
-                               ByteBuffer value,
-                               Header[] headers) {
+    public static int writeTo(ByteBuffer out,
+                              int offsetDelta,
+                              long timestampDelta,
+                              ByteBuffer key,
+                              ByteBuffer value,
+                              Header[] headers) {
         try {
             return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
                     key, value, headers);
@@ -251,24 +248,6 @@ public class DefaultRecord implements Record {
         }
     }
 
-    /**
-     * Compute the checksum of the record from the timestamp, key and value payloads
-     */
-    private static long computeChecksum(long timestamp,
-                                        ByteBuffer key,
-                                        ByteBuffer value) {
-        Checksum crc = Crc32C.create();
-        Checksums.updateLong(crc, timestamp);
-
-        if (key != null)
-            Checksums.update(crc, key, key.remaining());
-
-        if (value != null)
-            Checksums.update(crc, value, value.remaining());
-
-        return crc.getValue();
-    }
-
     @Override
     public boolean hasMagic(byte magic) {
         return magic >= MAGIC_VALUE_V2;
@@ -493,14 +472,18 @@ public class DefaultRecord implements Record {
         return size;
     }
 
-    static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
-        return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
-    }
-
     static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
         int keySize = key == null ? -1 : key.remaining();
         int valueSize = value == null ? -1 : value.remaining();
         return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
     }
 
+
+    public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) {
+        Checksum checksum = Crc32C.create();
+        Checksums.updateLong(checksum, timestamp);
+        Checksums.updateInt(checksum, serializedKeySize);
+        Checksums.updateInt(checksum, serializedValueSize);
+        return checksum.getValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 589e67c..13f958d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -30,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import org.apache.kafka.common.utils.Utils;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 
@@ -493,4 +493,5 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         }
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 42ae0f8..bc25d75 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -344,7 +344,10 @@ public class MemoryRecordsBuilder {
         return writtenCompressed;
     }
 
-    private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
+    /**
+     * Append a record and return its checksum for message format v0 and v1, or null for for v2 and above.
+     */
+    private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                   ByteBuffer value, Header[] headers) {
         try {
             if (isControlRecord != isControlBatch)
@@ -363,10 +366,12 @@ public class MemoryRecordsBuilder {
             if (baseTimestamp == null)
                 baseTimestamp = timestamp;
 
-            if (magic > RecordBatch.MAGIC_VALUE_V1)
-                return appendDefaultRecord(offset, timestamp, key, value, headers);
-            else
+            if (magic > RecordBatch.MAGIC_VALUE_V1) {
+                appendDefaultRecord(offset, timestamp, key, value, headers);
+                return null;
+            } else {
                 return appendLegacyRecord(offset, timestamp, key, value);
+            }
         } catch (IOException e) {
             throw new KafkaException("I/O exception when writing to the append stream, closing", e);
         }
@@ -379,9 +384,9 @@ public class MemoryRecordsBuilder {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
+    public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
         return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
     }
 
@@ -392,9 +397,9 @@ public class MemoryRecordsBuilder {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+    public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
         return appendWithOffset(offset, false, timestamp, key, value, headers);
     }
 
@@ -404,9 +409,9 @@ public class MemoryRecordsBuilder {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
+    public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
         return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
@@ -416,9 +421,9 @@ public class MemoryRecordsBuilder {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
+    public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
         return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
@@ -426,21 +431,20 @@ public class MemoryRecordsBuilder {
      * Append a new record at the given offset.
      * @param offset The absolute offset of the record in the log buffer
      * @param record The record to append
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long appendWithOffset(long offset, SimpleRecord record) {
+    public Long appendWithOffset(long offset, SimpleRecord record) {
         return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
-
     /**
      * Append a new record at the next sequential offset.
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
+    public Long append(long timestamp, ByteBuffer key, ByteBuffer value) {
         return append(timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
@@ -450,9 +454,9 @@ public class MemoryRecordsBuilder {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+    public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
         return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
     }
 
@@ -461,9 +465,9 @@ public class MemoryRecordsBuilder {
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long append(long timestamp, byte[] key, byte[] value) {
+    public Long append(long timestamp, byte[] key, byte[] value) {
         return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
@@ -473,18 +477,18 @@ public class MemoryRecordsBuilder {
      * @param key The record key
      * @param value The record value
      * @param headers The record headers if there are any
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
+    public Long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
         return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
     }
 
     /**
      * Append a new record at the next sequential offset.
      * @param record The record to append
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    public long append(SimpleRecord record) {
+    public Long append(SimpleRecord record) {
         return appendWithOffset(nextSequentialOffset(), record);
     }
 
@@ -493,9 +497,9 @@ public class MemoryRecordsBuilder {
      * @param timestamp The record timestamp
      * @param type The control record type (cannot be UNKNOWN)
      * @param value The control record value
-     * @return crc of the record
+     * @return CRC of the record or null if record-level CRC is not supported for the message format
      */
-    private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+    private Long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
         Struct keyStruct = type.recordKey();
         ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
         keyStruct.writeTo(key);
@@ -503,7 +507,10 @@ public class MemoryRecordsBuilder {
         return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
-    public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
+    /**
+     * Return CRC of the record or null if record-level CRC is not supported for the message format
+     */
+    public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
         if (producerId == RecordBatch.NO_PRODUCER_ID)
             throw new IllegalArgumentException("End transaction marker requires a valid producerId");
         if (!isTransactional)
@@ -568,15 +575,13 @@ public class MemoryRecordsBuilder {
         appendWithOffset(nextSequentialOffset(), record);
     }
 
-    private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
+    private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                      Header[] headers) throws IOException {
         ensureOpenForRecordAppend();
         int offsetDelta = (int) (offset - baseOffset);
         long timestampDelta = timestamp - baseTimestamp;
-        long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
-        // TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
-        recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers));
-        return crc;
+        int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
+        recordWritten(offset, timestamp, sizeInBytes);
     }
 
     private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index cba6fc5..6de28c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -54,9 +54,10 @@ public interface Record {
 
     /**
      * Get a checksum of the record contents.
-     * @return a 4-byte unsigned checksum represented as a long
+     * @return A 4-byte unsigned checksum represented as a long or null if the message format does not
+     *         include a checksum (i.e. for v2 and above)
      */
-    long checksum();
+    Long checksumOrNull();
 
     /**
      * Check whether the record has a valid checksum.

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index a8a5283..3273645 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
@@ -25,6 +26,7 @@ import static org.junit.Assert.assertEquals;
 public class ConsumerRecordTest {
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testOldConstructor() {
         String topic = "topic";
         int partition = 0;
@@ -46,5 +48,15 @@ public class ConsumerRecordTest {
         assertEquals(new RecordHeaders(), record.headers());
     }
 
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testNullChecksumInConstructor() {
+        String key = "key";
+        String value = "value";
+        long timestamp = 242341324L;
+        ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 0, 23L, timestamp,
+                TimestampType.CREATE_TIME, null, key.length(), value.length(), key, value, new RecordHeaders());
+        assertEquals(DefaultRecord.computePartialChecksum(timestamp, key.length(), value.length()), record.checksum());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
new file mode 100644
index 0000000..a735a61
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RecordMetadataTest {
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testConstructionWithMissingRelativeOffset() {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        long timestamp = 2340234L;
+        int keySize = 3;
+        int valueSize = 5;
+        Long checksum = 908923L;
+
+        RecordMetadata metadata = new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
+        assertEquals(tp.topic(), metadata.topic());
+        assertEquals(tp.partition(), metadata.partition());
+        assertEquals(timestamp, metadata.timestamp());
+        assertEquals(-1L, metadata.offset());
+        assertEquals(checksum.longValue(), metadata.checksum());
+        assertEquals(keySize, metadata.serializedKeySize());
+        assertEquals(valueSize, metadata.serializedValueSize());
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testConstructionWithRelativeOffset() {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        long timestamp = 2340234L;
+        int keySize = 3;
+        int valueSize = 5;
+        long baseOffset = 15L;
+        long relativeOffset = 3L;
+        Long checksum = 908923L;
+
+        RecordMetadata metadata = new RecordMetadata(tp, baseOffset, relativeOffset, timestamp, checksum,
+                keySize, valueSize);
+        assertEquals(tp.topic(), metadata.topic());
+        assertEquals(tp.partition(), metadata.partition());
+        assertEquals(timestamp, metadata.timestamp());
+        assertEquals(baseOffset + relativeOffset, metadata.offset());
+        assertEquals(checksum.longValue(), metadata.checksum());
+        assertEquals(keySize, metadata.serializedKeySize());
+        assertEquals(valueSize, metadata.serializedValueSize());
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testNullChecksum() {
+        long timestamp = 2340234L;
+        int keySize = 3;
+        int valueSize = 5;
+        RecordMetadata metadata = new RecordMetadata(new TopicPartition("foo", 0), 15L, 3L, timestamp, null,
+                keySize, valueSize);
+        assertEquals(DefaultRecord.computePartialChecksum(timestamp, keySize, valueSize), metadata.checksum());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 90ff16a..c083db3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -46,7 +46,7 @@ public class RecordSendTest {
     public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult(topicPartition);
         FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
-                                                               RecordBatch.NO_TIMESTAMP, 0, 0, 0);
+                RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
         assertFalse("Request is not completed", future.isDone());
         try {
             future.get(5, TimeUnit.MILLISECONDS);
@@ -66,7 +66,7 @@ public class RecordSendTest {
     @Test(expected = ExecutionException.class)
     public void testError() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
-                                                               relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
         future.get();
     }
 
@@ -76,7 +76,7 @@ public class RecordSendTest {
     @Test
     public void testBlocking() throws Exception {
         FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
-                                                               relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
+                relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
         assertEquals(baseOffset + relOffset, future.get().offset());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index fede528..da93015 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -18,17 +18,21 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ProducerBatchTest {
@@ -38,6 +42,31 @@ public class ProducerBatchTest {
     private final MemoryRecordsBuilder memoryRecordsBuilder = MemoryRecords.builder(ByteBuffer.allocate(128),
             CompressionType.NONE, TimestampType.CREATE_TIME, 128);
 
+    @Test
+    public void testChecksumNullForMagicV2() {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+        assertNotNull(future);
+        assertNull(future.checksumOrNull());
+    }
+
+    @Test
+    public void testAppendedChecksumMagicV0AndV1() {
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
+                    CompressionType.NONE, TimestampType.CREATE_TIME, 128);
+            ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
+            byte[] key = "hi".getBytes();
+            byte[] value = "there".getBytes();
+
+            FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
+            assertNotNull(future);
+            byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
+            long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
+            assertEquals(expectedChecksum, future.checksumOrNull().longValue());
+        }
+    }
+
     /**
      * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
      * time is interpreted correctly as not expired when the linger time is larger than the difference

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 32b3ddb..9eeb114 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -144,7 +144,7 @@ public class ProducerInterceptorsTest {
         ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
 
         // verify onAck is called on all interceptors
-        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0);
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0);
         interceptors.onAcknowledgement(meta, null);
         assertEquals(2, onAckCount);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index b9675c3..f48ab33 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
@@ -33,8 +30,8 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -52,7 +49,10 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 58d4371..0922c48 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -323,6 +323,22 @@ public class MemoryRecordsBuilderTest {
     }
 
     @Test
+    public void testAppendedChecksumConsistency() {
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
+            MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
+                    TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+            Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes());
+            MemoryRecords memoryRecords = builder.build();
+            List<Record> records = TestUtils.toList(memoryRecords.records());
+            assertEquals(1, records.size());
+            assertEquals(checksumOrNull, records.get(0).checksumOrNull());
+        }
+    }
+
+    @Test
     public void testSmallWriteLimit() {
         // with a small write limit, we always allow at least one record to be added
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index 08c8e74..3fcc157 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -56,6 +56,7 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
 
         // This will ensure that we get the cluster metadata when onConsume is called for the first time
@@ -99,4 +100,4 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
     public void onUpdate(ClusterResource clusterResource) {
         CLUSTER_META.set(clusterResource);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 4d35a85..f0c41c7 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -353,11 +353,13 @@ object DumpLogSegments {
           print("offset: " + record.offset + " position: " + validBytes +
             " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
             " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
-            " compresscodec: " + batch.compressionType + " crc: " + record.checksum)
+            " compresscodec: " + batch.compressionType)
 
           if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            print(" sequence: " + record.sequence +
+            print(" crc: " + batch.checksum + " sequence: " + record.sequence +
               " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+          } else {
+            print(" crc: " + record.checksumOrNull)
           }
 
           if (batch.isControlBatch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index a2d83a6..dab5fa7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,7 +68,7 @@
         individual messages is only reduced by the overhead of the batch format. This similarly affects the
         producer's <code>batch.size</code> configuration.</li>
     <li>GC log rotation is enabled by default, see KAFKA-3754 for details.</li>
-    <li>Deprecated constructors of MetricName and Cluster classes have been removed.</li>
+    <li>Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.</li>
     <li>Added user headers support through a new Headers interface providing user headers read and write access.</li>
     <li>ProducerRecord and ConsumerRecord expose the new Headers API via <code>Headers headers()</code> method call.</li>
     <li>ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.</li>