You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/02/19 16:56:52 UTC

[5/5] kafka git commit: KAFKA-3025; Added timetamp to Message and use relative offset.

KAFKA-3025; Added timetamp to Message and use relative offset.

See KIP-31 and KIP-32 for details.

A few notes on the patch:
1. This patch implements KIP-31 and KIP-32. The patch includes features in both KAFKA-3025,  KAFKA-3026 and KAFKA-3036
2. All unit tests passed.
3. The unit tests were run with new and old message format.
4. When message format conversion occurs during consumption, the consumer will not be able to detect the message size too large situation. I did not try to fix this because the situation seems rare and only happen during migration phase.

Author: Jiangjie Qin <be...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>
Author: Jiangjie (Becket) Qin <be...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Anna Povzner <an...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #764 from becketqin/KAFKA-3025


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45c8195f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45c8195f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45c8195f

Branch: refs/heads/trunk
Commit: 45c8195fa14c766b200c720f316836dbb84e9d8b
Parents: eee9522
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Feb 19 07:56:40 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Feb 19 07:56:40 2016 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../kafka/clients/consumer/ConsumerRecord.java  |  32 +-
 .../clients/consumer/internals/Fetcher.java     |   5 +-
 .../kafka/clients/producer/KafkaProducer.java   |  11 +-
 .../kafka/clients/producer/MockProducer.java    |   5 +-
 .../kafka/clients/producer/ProducerRecord.java  |  61 ++-
 .../kafka/clients/producer/RecordMetadata.java  |  20 +-
 .../internals/FutureRecordMetadata.java         |  10 +-
 .../producer/internals/RecordAccumulator.java   |  16 +-
 .../clients/producer/internals/RecordBatch.java |  18 +-
 .../clients/producer/internals/Sender.java      |  15 +-
 .../errors/InvalidTimestampException.java       |  34 ++
 .../apache/kafka/common/protocol/Errors.java    |   5 +-
 .../apache/kafka/common/protocol/Protocol.java  |  52 ++-
 .../apache/kafka/common/record/Compressor.java  |  22 +-
 .../kafka/common/record/MemoryRecords.java      | 115 +++--
 .../org/apache/kafka/common/record/Record.java  | 146 ++++--
 .../kafka/common/record/TimestampType.java      |  62 +++
 .../kafka/common/requests/ProduceRequest.java   |   6 +-
 .../kafka/common/requests/ProduceResponse.java  |  40 +-
 .../org/apache/kafka/common/utils/Crc32.java    |  11 +
 .../clients/consumer/MockConsumerTest.java      |   5 +-
 .../internals/ConsumerInterceptorsTest.java     |   8 +-
 .../clients/consumer/internals/FetcherTest.java |  17 +-
 .../clients/producer/ProducerRecordTest.java    |   2 +-
 .../kafka/clients/producer/RecordSendTest.java  |   7 +-
 .../internals/ProducerInterceptorsTest.java     |   2 +-
 .../internals/RecordAccumulatorTest.java        |  30 +-
 .../clients/producer/internals/SenderTest.java  |  11 +-
 .../kafka/common/record/MemoryRecordsTest.java  |   8 +-
 .../apache/kafka/common/record/RecordTest.java  |  16 +-
 .../common/requests/RequestResponseTest.java    |  14 +-
 .../kafka/test/MockConsumerInterceptor.java     |   4 +-
 .../connect/runtime/WorkerSinkTaskTest.java     |   3 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   5 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |   2 +-
 .../connect/storage/KafkaConfigStorageTest.java |  25 +-
 .../storage/KafkaOffsetBackingStoreTest.java    |  17 +-
 .../kafka/connect/util/KafkaBasedLogTest.java   |  17 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  18 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  27 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  42 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   3 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   4 +-
 .../main/scala/kafka/api/ProducerResponse.scala |  14 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |   1 +
 .../scala/kafka/consumer/BaseConsumer.scala     |  26 +-
 .../scala/kafka/consumer/ConsumerIterator.scala |   9 +-
 .../controller/ControllerChannelManager.scala   |   4 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  36 +-
 .../coordinator/GroupMetadataManager.scala      |  44 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  63 ++-
 core/src/main/scala/kafka/log/Log.scala         |  33 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  53 ++-
 core/src/main/scala/kafka/log/LogConfig.scala   |  39 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 447 ++++++++++++++++---
 core/src/main/scala/kafka/message/Message.scala | 259 +++++++++--
 .../kafka/message/MessageAndMetadata.scala      |  11 +-
 .../main/scala/kafka/message/MessageSet.scala   |  25 +-
 .../scala/kafka/message/MessageWriter.scala     |  27 +-
 .../producer/async/DefaultEventHandler.scala    |  23 +-
 .../kafka/server/AbstractFetcherThread.scala    |   2 +-
 .../main/scala/kafka/server/ConfigHandler.scala |  14 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  46 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  25 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  11 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   7 +-
 .../scala/kafka/server/ReplicaManager.scala     |  21 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |  33 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |   2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  16 +-
 .../scala/kafka/tools/ReplayLogProducer.scala   |   4 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |   3 +-
 .../kafka/api/BaseConsumerTest.scala            |  42 +-
 .../kafka/api/BaseProducerSendTest.scala        | 180 ++++++--
 .../kafka/api/PlaintextConsumerTest.scala       | 113 +++--
 .../kafka/api/ProducerCompressionTest.scala     |   5 +-
 .../api/RequestResponseSerializationTest.scala  |   1 +
 .../GroupCoordinatorResponseTest.scala          |  13 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  13 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  68 +++
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |   3 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   1 +
 .../src/test/scala/unit/kafka/log/LogTest.scala |  18 +-
 .../message/ByteBufferMessageSetTest.scala      | 260 ++++++++++-
 .../kafka/message/MessageCompressionTest.scala  |  10 +-
 .../scala/unit/kafka/message/MessageTest.scala  |  79 +++-
 .../unit/kafka/message/MessageWriterTest.scala  |   9 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  15 +-
 .../unit/kafka/producer/ProducerTest.scala      |  31 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  77 ++--
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  16 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  19 +-
 .../unit/kafka/tools/ConsoleConsumerTest.scala  |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   7 +-
 docs/upgrade.html                               |  52 +++
 .../processor/internals/RecordQueue.java        |   4 +-
 .../processor/internals/PartitionGroupTest.java |  13 +-
 .../internals/ProcessorStateManagerTest.java    |   8 +-
 .../processor/internals/RecordQueueTest.java    |  19 +-
 .../processor/internals/StandbyTaskTest.java    |  19 +-
 .../processor/internals/StreamTaskTest.java     |  31 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 106 files changed, 2688 insertions(+), 723 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a663cf7..b183b3d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -84,6 +84,7 @@
     <subpackage name="requests">
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.record" />
       <!-- for testing -->
       <allow pkg="org.apache.kafka.common.errors" />
     </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index d4668c2..42e0a90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -12,6 +12,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.record.TimestampType;
+
 /**
  * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
  * record is being received and an offset that points to the record in a Kafka partition.
@@ -20,6 +22,8 @@ public final class ConsumerRecord<K, V> {
     private final String topic;
     private final int partition;
     private final long offset;
+    private final long timestamp;
+    private final TimestampType timestampType;
     private final K key;
     private final V value;
 
@@ -29,15 +33,25 @@ public final class ConsumerRecord<K, V> {
      * @param topic The topic this record is received from
      * @param partition The partition of the topic this record is received from
      * @param offset The offset of this record in the corresponding Kafka partition
+     * @param timestamp The timestamp of the record.
+     * @param timestampType The timestamp type
      * @param key The key of the record, if one exists (null is allowed)
      * @param value The record contents
      */
-    public ConsumerRecord(String topic, int partition, long offset, K key, V value) {
+    public ConsumerRecord(String topic,
+                          int partition,
+                          long offset,
+                          long timestamp,
+                          TimestampType timestampType,
+                          K key,
+                          V value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
+        this.timestamp = timestamp;
+        this.timestampType = timestampType;
         this.key = key;
         this.value = value;
     }
@@ -77,9 +91,23 @@ public final class ConsumerRecord<K, V> {
         return offset;
     }
 
+    /**
+     * The timestamp of this record
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * The timestamp type of this record
+     */
+    public TimestampType timestampType() {
+        return timestampType;
+    }
+
     @Override
     public String toString() {
         return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
-                + ", key = " + key + ", value = " + value + ")";
+                + ", " + timestampType + " = " + timestamp + ", key = " + key + ", value = " + value + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e8f1f55..427664a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
@@ -614,12 +615,14 @@ public class Fetcher<K, V> {
             if (this.checkCrcs)
                 logEntry.record().ensureValid();
             long offset = logEntry.offset();
+            long timestamp = logEntry.record().timestamp();
+            TimestampType timestampType = logEntry.record().timestampType();
             ByteBuffer keyBytes = logEntry.record().key();
             K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
             ByteBuffer valueBytes = logEntry.record().value();
             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
 
-            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, key, value);
         } catch (KafkaException e) {
             throw e;
         } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a76dc1a..a066512 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -359,8 +359,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
      * response after each one.
      * <p>
-     * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
-     * it was assigned.
+     * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to, the offset
+     * it was assigned and the timestamp of the record. If
+     * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime} is used by the topic, the timestamp
+     * will be the user provided timestamp or the record send time if the user did not specify a timestamp for the
+     * record. If {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime} is used for the
+     * topic, the timestamp will be the Kafka broker local time when the message is appended.
      * <p>
      * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
      * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
@@ -456,8 +460,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
+            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, callback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 8388ab8..5f97bae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.serialization.Serializer;
 
 
@@ -116,10 +117,10 @@ public class MockProducer<K, V> implements Producer<K, V> {
         if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partition(record, this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
-        FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0, Record.NO_TIMESTAMP);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback);
+        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset, Record.NO_TIMESTAMP), result, callback);
         this.sent.add(record);
         if (autoComplete)
             completion.complete(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 75cd51e..85b4d8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -19,6 +19,22 @@ package org.apache.kafka.clients.producer;
  * If a valid partition number is specified that partition will be used when sending the record. If no partition is
  * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
  * present a partition will be assigned in a round-robin fashion.
+ * <p>
+ * The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the
+ * record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for
+ * the topic.
+ * <li>
+ * If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime},
+ * the timestamp in the producer record will be used by the broker.
+ * </li>
+ * <li>
+ * If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime},
+ * the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the
+ * message to its log.
+ * </li>
+ * <p>
+ * In either of the cases above, the timestamp that has actually been used will be returned to user in
+ * {@link RecordMetadata}
  */
 public final class ProducerRecord<K, V> {
 
@@ -26,22 +42,39 @@ public final class ProducerRecord<K, V> {
     private final Integer partition;
     private final K key;
     private final V value;
+    private final Long timestamp;
 
     /**
-     * Creates a record to be sent to a specified topic and partition
+     * Creates a record with a specified timestamp to be sent to a specified topic and partition
      * 
      * @param topic The topic the record will be appended to
      * @param partition The partition to which the record should be sent
+     * @param timestamp The timestamp of the record
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Integer partition, K key, V value) {
+    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
+        if (timestamp != null && timestamp < 0)
+            throw new IllegalArgumentException("Invalid timestamp " + timestamp);
         this.topic = topic;
         this.partition = partition;
         this.key = key;
         this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Creates a record to be sent to a specified topic and partition
+     *
+     * @param topic The topic the record will be appended to
+     * @param partition The partition to which the record should be sent
+     * @param key The key that will be included in the record
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Integer partition, K key, V value) {
+        this(topic, partition, null, key, value);
     }
 
     /**
@@ -52,7 +85,7 @@ public final class ProducerRecord<K, V> {
      * @param value The record contents
      */
     public ProducerRecord(String topic, K key, V value) {
-        this(topic, null, key, value);
+        this(topic, null, null, key, value);
     }
 
     /**
@@ -62,18 +95,18 @@ public final class ProducerRecord<K, V> {
      * @param value The record contents
      */
     public ProducerRecord(String topic, V value) {
-        this(topic, null, value);
+        this(topic, null, null, null, value);
     }
 
     /**
-     * The topic this record is being sent to
+     * @return The topic this record is being sent to
      */
     public String topic() {
         return topic;
     }
 
     /**
-     * The key (or null if no key is specified)
+     * @return The key (or null if no key is specified)
      */
     public K key() {
         return key;
@@ -87,7 +120,14 @@ public final class ProducerRecord<K, V> {
     }
 
     /**
-     * The partition to which the record will be sent (or null if no partition was specified)
+     * @return The timestamp
+     */
+    public Long timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * @return The partition to which the record will be sent (or null if no partition was specified)
      */
     public Integer partition() {
         return partition;
@@ -97,7 +137,9 @@ public final class ProducerRecord<K, V> {
     public String toString() {
         String key = this.key == null ? "null" : this.key.toString();
         String value = this.value == null ? "null" : this.value.toString();
-        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
+        String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
+        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value +
+            ", timestamp=" + timestamp + ")";
     }
 
     @Override
@@ -117,6 +159,8 @@ public final class ProducerRecord<K, V> {
             return false;
         else if (value != null ? !value.equals(that.value) : that.value != null) 
             return false;
+        else if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
+            return false;
 
         return true;
     }
@@ -127,6 +171,7 @@ public final class ProducerRecord<K, V> {
         result = 31 * result + (partition != null ? partition.hashCode() : 0);
         result = 31 * result + (key != null ? key.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index a80f6b9..d9ea239 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -24,18 +24,25 @@ import org.apache.kafka.common.TopicPartition;
 public final class RecordMetadata {
 
     private final long offset;
+    // The timestamp of the message.
+    // If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
+    // If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
+    // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
+    // producer.
+    private final long timestamp;
     private final TopicPartition topicPartition;
 
-    private RecordMetadata(TopicPartition topicPartition, long offset) {
+    private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp) {
         super();
         this.offset = offset;
+        this.timestamp = timestamp;
         this.topicPartition = topicPartition;
     }
 
-    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
+    public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp) {
         // ignore the relativeOffset if the base offset is -1,
         // since this indicates the offset is unknown
-        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
+        this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, timestamp);
     }
 
     /**
@@ -46,6 +53,13 @@ public final class RecordMetadata {
     }
 
     /**
+     * The timestamp of the record in the topic/partition.
+     */
+    public long timestamp() {
+        return timestamp;
+    }
+
+    /**
      * The topic the record was appended to
      */
     public String topic() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index e2d9ca8..a140371 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -26,10 +26,12 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
 
     private final ProduceRequestResult result;
     private final long relativeOffset;
+    private final long timestamp;
 
-    public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
+    public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long timestamp) {
         this.result = result;
         this.relativeOffset = relativeOffset;
+        this.timestamp = timestamp;
     }
 
     @Override
@@ -59,13 +61,17 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
     }
     
     RecordMetadata value() {
-        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, this.timestamp);
     }
     
     public long relativeOffset() {
         return this.relativeOffset;
     }
 
+    public long timestamp() {
+        return this.timestamp;
+    }
+
     @Override
     public boolean isCancelled() {
         return false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 3c710c8..f1414f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -146,12 +146,18 @@ public final class RecordAccumulator {
      * <p>
      *
      * @param tp The topic/partition to which this record is being sent
+     * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
      * @param callback The user-supplied callback to execute when the request is complete
      * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp,
+                                     long timestamp,
+                                     byte[] key,
+                                     byte[] value,
+                                     Callback callback,
+                                     long maxTimeToBlock) throws InterruptedException {
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
@@ -163,7 +169,7 @@ public final class RecordAccumulator {
                     throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
+                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                     if (future != null)
                         return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -179,7 +185,7 @@ public final class RecordAccumulator {
                     throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
+                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                     if (future != null) {
                         // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                         free.deallocate(buffer);
@@ -188,7 +194,7 @@ public final class RecordAccumulator {
                 }
                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
@@ -454,7 +460,7 @@ public final class RecordAccumulator {
                 batch.records.close();
                 dq.remove(batch);
             }
-            batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
+            batch.done(-1L, Record.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully."));
             deallocate(batch);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 3f18582..af9095d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -44,6 +44,7 @@ public final class RecordBatch {
     public final ProduceRequestResult produceFuture;
     public long lastAppendTime;
     private final List<Thunk> thunks;
+    private long offsetCounter = 0L;
     private boolean retry;
 
     public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
@@ -62,14 +63,14 @@ public final class RecordBatch {
      * 
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {
+    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
-            this.records.append(0L, key, value);
+            this.records.append(offsetCounter++, timestamp, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             this.lastAppendTime = now;
-            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
+            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
             this.recordCount++;
@@ -81,9 +82,10 @@ public final class RecordBatch {
      * Complete the request
      * 
      * @param baseOffset The base offset of the messages assigned by the server
+     * @param timestamp The timestamp returned by the broker.
      * @param exception The exception that occurred (or null if the request was successful)
      */
-    public void done(long baseOffset, RuntimeException exception) {
+    public void done(long baseOffset, long timestamp, RuntimeException exception) {
         log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
                   topicPartition,
                   baseOffset,
@@ -93,7 +95,9 @@ public final class RecordBatch {
             try {
                 Thunk thunk = this.thunks.get(i);
                 if (exception == null) {
-                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset());
+                    // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
+                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
+                        timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp);
                     thunk.callback.onCompletion(metadata, null);
                 } else {
                     thunk.callback.onCompletion(null, exception);
@@ -133,7 +137,7 @@ public final class RecordBatch {
         if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) {
             expire = true;
             this.records.close();
-            this.done(-1L, new TimeoutException("Batch Expired"));
+            this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch Expired"));
         }
 
         return expire;
@@ -152,4 +156,4 @@ public final class RecordBatch {
     public void setRetry() {
         this.retry = true;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index aa30716..8e93973 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestSend;
@@ -243,7 +244,7 @@ public class Sender implements Runnable {
                                                                                                   .request()
                                                                                                   .destination());
             for (RecordBatch batch : batches.values())
-                completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
+                completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
         } else {
             log.trace("Received produce response from node {} with correlation id {}",
                       response.request().request().destination(),
@@ -251,13 +252,12 @@ public class Sender implements Runnable {
             // if we have a response, parse it
             if (response.hasResponse()) {
                 ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
-                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses()
-                                                                                                         .entrySet()) {
+                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                     TopicPartition tp = entry.getKey();
                     ProduceResponse.PartitionResponse partResp = entry.getValue();
                     Errors error = Errors.forCode(partResp.errorCode);
                     RecordBatch batch = batches.get(tp);
-                    completeBatch(batch, error, partResp.baseOffset, correlationId, now);
+                    completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                 }
                 this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
                 this.sensors.recordThrottleTime(response.request().request().destination(),
@@ -265,7 +265,7 @@ public class Sender implements Runnable {
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (RecordBatch batch : batches.values())
-                    completeBatch(batch, Errors.NONE, -1L, correlationId, now);
+                    completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
             }
         }
     }
@@ -276,10 +276,11 @@ public class Sender implements Runnable {
      * @param batch The record batch
      * @param error The error (or null if none)
      * @param baseOffset The base offset assigned to the records if successful
+     * @param timestamp The timestamp returned by the broker for this batch
      * @param correlationId The correlation id for the request
      * @param now The current POSIX time stamp in milliseconds
      */
-    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
+    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
         if (error != Errors.NONE && canRetry(batch, error)) {
             // retry
             log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
@@ -296,7 +297,7 @@ public class Sender implements Runnable {
             else
                 exception = error.exception();
             // tell the user the result of their request
-            batch.done(baseOffset, exception);
+            batch.done(baseOffset, timestamp, exception);
             this.accumulator.deallocate(batch);
             if (error != Errors.NONE)
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java
new file mode 100644
index 0000000..d2d285b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTimestampException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicate the timestamp of a record is invalid.
+ */
+public class InvalidTimestampException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidTimestampException(String message) {
+        super(message);
+    }
+
+    public InvalidTimestampException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 4a20869..e7098fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
@@ -125,7 +126,9 @@ public enum Errors {
     GROUP_AUTHORIZATION_FAILED(30,
             new GroupAuthorizationException("Group authorization failed.")),
     CLUSTER_AUTHORIZATION_FAILED(31,
-            new ClusterAuthorizationException("Cluster authorization failed."));
+            new ClusterAuthorizationException("Cluster authorization failed.")),
+    INVALID_TIMESTAMP(32,
+            new InvalidTimestampException("The timestamp of the message is out of acceptable range."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 48c64c2..3787d2c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -110,7 +110,17 @@ public class Protocol {
                                                                                                                                       INT16),
                                                                                                                             new Field("base_offset",
                                                                                                                                       INT64))))))));
+    /**
+     * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0.
+     * The version number is bumped up to indicate that the client supports quota throttle time field in the response.
+     */
     public static final Schema PRODUCE_REQUEST_V1 = PRODUCE_REQUEST_V0;
+    /**
+     * The body of PRODUCE_REQUEST_V2 is the same as PRODUCE_REQUEST_V1.
+     * The version number is bumped up to indicate that message format V1 is used which has relative offset and
+     * timestamp.
+     */
+    public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
 
     public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                           new ArrayOf(new Schema(new Field("topic", STRING),
@@ -126,9 +136,33 @@ public class Protocol {
                                                                           "Duration in milliseconds for which the request was throttled" +
                                                                               " due to quota violation. (Zero if the request did not violate any quota.)",
                                                                           0));
-
-    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1};
-    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1};
+    /**
+     * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status.
+     * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create
+     * time is used for the topic.
+     */
+    public static final Schema PRODUCE_RESPONSE_V2 = new Schema(new Field("responses",
+                                                                new ArrayOf(new Schema(new Field("topic", STRING),
+                                                                                       new Field("partition_responses",
+                                                                                       new ArrayOf(new Schema(new Field("partition",
+                                                                                                                        INT32),
+                                                                                                              new Field("error_code",
+                                                                                                                        INT16),
+                                                                                                              new Field("base_offset",
+                                                                                                                        INT64),
+                                                                                                              new Field("timestamp",
+                                                                                                                        INT64,
+                                                                                                                        "The timestamp returned by broker after appending the messages. " +
+                                                                                                                            "If CreateTime is used for the topic, the timestamp will be -1. " +
+                                                                                                                            "If LogAppendTime is used for the topic, the timestamp will be " +
+                                                                                                                            "the broker local time when the messages are appended."))))))),
+                                                                new Field("throttle_time_ms",
+                                                                          INT32,
+                                                                          "Duration in milliseconds for which the request was throttled" +
+                                                                              " due to quota violation. (Zero if the request did not violate any quota.)",
+                                                                          0));
+    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2};
+    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2};
 
     /* Offset commit api */
     public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -364,6 +398,10 @@ public class Protocol {
     // The V1 Fetch Request body is the same as V0.
     // Only the version number is incremented to indicate a newer client
     public static final Schema FETCH_REQUEST_V1 = FETCH_REQUEST_V0;
+    // The V2 Fetch Request body is the same as V1.
+    // Only the version number is incremented to indicate the client support message format V1 which uses
+    // relative offset and has timestamp.
+    public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
     public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                   INT32,
                                                                                   "Topic partition id."),
@@ -386,9 +424,13 @@ public class Protocol {
                                                                         0),
                                                               new Field("responses",
                                                                       new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+    // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1,
+    // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1
+    // (magic byte 0 and 1). For details, see ByteBufferMessageSet.
+    public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index c7ff2e6..cde2178 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -89,6 +89,7 @@ public class Compressor {
     public long writtenUncompressed;
     public long numRecords;
     public float compressionRate;
+    public long maxTimestamp;
 
     public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) {
         this.type = type;
@@ -97,6 +98,7 @@ public class Compressor {
         this.numRecords = 0;
         this.writtenUncompressed = 0;
         this.compressionRate = 1;
+        this.maxTimestamp = Record.NO_TIMESTAMP;
 
         if (type != CompressionType.NONE) {
             // for compressed records, leave space for the header and the shallow message metadata
@@ -136,10 +138,10 @@ public class Compressor {
             buffer.putLong(numRecords - 1);
             buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
             // write the shallow message (the crc and value size are not correct yet)
-            Record.write(buffer, null, null, type, 0, -1);
+            Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
             // compute the fill the value size
             int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
-            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize);
+            buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
             // compute and fill the crc at the beginning of the message
             long crc = Record.computeChecksum(buffer,
                 initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
@@ -199,19 +201,21 @@ public class Compressor {
         }
     }
 
-    public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+    public void putRecord(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
         // put a record as un-compressed into the underlying stream
-        long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize);
+        long crc = Record.computeChecksum(timestamp, key, value, type, valueOffset, valueSize);
         byte attributes = Record.computeAttributes(type);
-        putRecord(crc, attributes, key, value, valueOffset, valueSize);
+        putRecord(crc, attributes, timestamp, key, value, valueOffset, valueSize);
+
     }
 
-    public void putRecord(byte[] key, byte[] value) {
-        putRecord(key, value, CompressionType.NONE, 0, -1);
+    public void putRecord(long timestamp, byte[] key, byte[] value) {
+        putRecord(timestamp, key, value, CompressionType.NONE, 0, -1);
     }
 
-    private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
-        Record.write(this, crc, attributes, key, value, valueOffset, valueSize);
+    private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
+        maxTimestamp = Math.max(maxTimestamp, timestamp);
+        Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
     }
 
     public void recordWritten(int size) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 971f0a2..01da1e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,6 +16,7 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
 import java.util.Iterator;
 
 import org.apache.kafka.common.KafkaException;
@@ -88,14 +89,14 @@ public class MemoryRecords implements Records {
     /**
      * Append a new record and offset to the buffer
      */
-    public void append(long offset, byte[] key, byte[] value) {
+    public void append(long offset, long timestamp, byte[] key, byte[] value) {
         if (!writable)
             throw new IllegalStateException("Memory records is not writable");
 
         int size = Record.recordSize(key, value);
         compressor.putLong(offset);
         compressor.putInt(size);
-        compressor.putRecord(key, value);
+        compressor.putRecord(timestamp, key, value);
         compressor.recordWritten(size + Records.LOG_OVERHEAD);
     }
 
@@ -214,11 +215,50 @@ public class MemoryRecords implements Records {
         private final boolean shallow;
         private RecordsIterator innerIter;
 
+        // The variables for inner iterator
+        private final ArrayDeque<LogEntry> logEntries;
+        private final long absoluteBaseOffset;
+
         public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
             this.type = type;
             this.buffer = buffer;
             this.shallow = shallow;
             this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+            this.logEntries = null;
+            this.absoluteBaseOffset = -1;
+        }
+
+        // Private constructor for inner iterator.
+        private RecordsIterator(LogEntry entry) {
+            this.type = entry.record().compressionType();
+            this.buffer = entry.record().value();
+            this.shallow = true;
+            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+            long wrapperRecordOffset = entry.offset();
+            // If relative offset is used, we need to decompress the entire message first to compute
+            // the absolute offset.
+            if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
+                this.logEntries = new ArrayDeque<>();
+                long wrapperRecordTimestamp = entry.record().timestamp();
+                while (true) {
+                    try {
+                        LogEntry logEntry = getNextEntryFromStream();
+                        Record recordWithTimestamp = new Record(logEntry.record().buffer(),
+                                                                wrapperRecordTimestamp,
+                                                                entry.record().timestampType());
+                        logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
+                    } catch (EOFException e) {
+                        break;
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+                }
+                this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
+            } else {
+                this.logEntries = null;
+                this.absoluteBaseOffset = -1;
+            }
+
         }
 
         /*
@@ -232,28 +272,16 @@ public class MemoryRecords implements Records {
         protected LogEntry makeNext() {
             if (innerDone()) {
                 try {
-                    // read the offset
-                    long offset = stream.readLong();
-                    // read record size
-                    int size = stream.readInt();
-                    if (size < 0)
-                        throw new IllegalStateException("Record with size " + size);
-                    // read the record, if compression is used we cannot depend on size
-                    // and hence has to do extra copy
-                    ByteBuffer rec;
-                    if (type == CompressionType.NONE) {
-                        rec = buffer.slice();
-                        int newPos = buffer.position() + size;
-                        if (newPos > buffer.limit())
-                            return allDone();
-                        buffer.position(newPos);
-                        rec.limit(size);
-                    } else {
-                        byte[] recordBuffer = new byte[size];
-                        stream.readFully(recordBuffer, 0, size);
-                        rec = ByteBuffer.wrap(recordBuffer);
+                    LogEntry entry = getNextEntry();
+                    // No more record to return.
+                    if (entry == null)
+                        return allDone();
+
+                    // Convert offset to absolute offset if needed.
+                    if (absoluteBaseOffset >= 0) {
+                        long absoluteOffset = absoluteBaseOffset + entry.offset();
+                        entry = new LogEntry(absoluteOffset, entry.record());
                     }
-                    LogEntry entry = new LogEntry(offset, new Record(rec));
 
                     // decide whether to go shallow or deep iteration if it is compressed
                     CompressionType compression = entry.record().compressionType();
@@ -264,8 +292,9 @@ public class MemoryRecords implements Records {
                         // which will de-compress the payload to a set of messages;
                         // since we assume nested compression is not allowed, the deep iterator
                         // would not try to further decompress underlying messages
-                        ByteBuffer value = entry.record().value();
-                        innerIter = new RecordsIterator(value, compression, true);
+                        // There will be at least one element in the inner iterator, so we don't
+                        // need to call hasNext() here.
+                        innerIter = new RecordsIterator(entry);
                         return innerIter.next();
                     }
                 } catch (EOFException e) {
@@ -278,6 +307,42 @@ public class MemoryRecords implements Records {
             }
         }
 
+        private LogEntry getNextEntry() throws IOException {
+            if (logEntries != null)
+                return getNextEntryFromEntryList();
+            else
+                return getNextEntryFromStream();
+        }
+
+        private LogEntry getNextEntryFromEntryList() {
+            return logEntries.isEmpty() ? null : logEntries.remove();
+        }
+
+        private LogEntry getNextEntryFromStream() throws IOException {
+            // read the offset
+            long offset = stream.readLong();
+            // read record size
+            int size = stream.readInt();
+            if (size < 0)
+                throw new IllegalStateException("Record with size " + size);
+            // read the record, if compression is used we cannot depend on size
+            // and hence has to do extra copy
+            ByteBuffer rec;
+            if (type == CompressionType.NONE) {
+                rec = buffer.slice();
+                int newPos = buffer.position() + size;
+                if (newPos > buffer.limit())
+                    return null;
+                buffer.position(newPos);
+                rec.limit(size);
+            } else {
+                byte[] recordBuffer = new byte[size];
+                stream.readFully(recordBuffer, 0, size);
+                rec = ByteBuffer.wrap(recordBuffer);
+            }
+            return new LogEntry(offset, new Record(rec));
+        }
+
         private boolean innerDone() {
             return innerIter == null || !innerIter.hasNext();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 50fac24..8390dc7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -36,9 +36,13 @@ public final class Record {
     public static final int MAGIC_LENGTH = 1;
     public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
     public static final int ATTRIBUTE_LENGTH = 1;
-    public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int TIMESTAMP_LENGTH = 8;
+    public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
     public static final int KEY_SIZE_LENGTH = 4;
-    public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH;
+    public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
+    public static final int KEY_OFFSET_V1 = KEY_SIZE_OFFSET_V1 + KEY_SIZE_LENGTH;
     public static final int VALUE_SIZE_LENGTH = 4;
 
     /**
@@ -49,12 +53,18 @@ public final class Record {
     /**
      * The amount of overhead bytes in a record
      */
-    public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    public static final int RECORD_OVERHEAD = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * The "magic" values
+     */
+    public static final byte MAGIC_VALUE_V0 = 0;
+    public static final byte MAGIC_VALUE_V1 = 1;
 
     /**
      * The current "magic" value
      */
-    public static final byte CURRENT_MAGIC_VALUE = 0;
+    public static final byte CURRENT_MAGIC_VALUE = MAGIC_VALUE_V1;
 
     /**
      * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
@@ -63,66 +73,92 @@ public final class Record {
     public static final int COMPRESSION_CODEC_MASK = 0x07;
 
     /**
+     * Specify the mask of timestamp type.
+     * 0 for CreateTime, 1 for LogAppendTime.
+     */
+    public static final byte TIMESTAMP_TYPE_MASK = 0x08;
+    public static final int TIMESTAMP_TYPE_ATTRIBUTE_OFFSET = 3;
+
+    /**
      * Compression code for uncompressed records
      */
     public static final int NO_COMPRESSION = 0;
 
+    /**
+     * Timestamp value for records without a timestamp
+     */
+    public static final long NO_TIMESTAMP = -1L;
+
     private final ByteBuffer buffer;
+    private final Long wrapperRecordTimestamp;
+    private final TimestampType wrapperRecordTimestampType;
 
     public Record(ByteBuffer buffer) {
         this.buffer = buffer;
+        this.wrapperRecordTimestamp = null;
+        this.wrapperRecordTimestampType = null;
+    }
+
+    // Package private constructor for inner iteration.
+    Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
+        this.buffer = buffer;
+        this.wrapperRecordTimestamp = wrapperRecordTimestamp;
+        this.wrapperRecordTimestampType = wrapperRecordTimestampType;
     }
 
     /**
      * A constructor to create a LogRecord. If the record's compression type is not none, then
      * its value payload should be already compressed with the specified type; the constructor
      * would always write the value payload as is and will not do the compression itself.
-     * 
+     *
+     * @param timestamp The timestamp of the record
      * @param key The key of the record (null, if none)
      * @param value The record value
      * @param type The compression type used on the contents of the record (if any)
      * @param valueOffset The offset into the payload array used to extract payload
      * @param valueSize The size of the payload to use
      */
-    public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+    public Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
         this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length,
             value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset)));
-        write(this.buffer, key, value, type, valueOffset, valueSize);
+        write(this.buffer, timestamp, key, value, type, valueOffset, valueSize);
         this.buffer.rewind();
     }
 
-    public Record(byte[] key, byte[] value, CompressionType type) {
-        this(key, value, type, 0, -1);
+    public Record(long timestamp, byte[] key, byte[] value, CompressionType type) {
+        this(timestamp, key, value, type, 0, -1);
     }
 
-    public Record(byte[] value, CompressionType type) {
-        this(null, value, type);
+    public Record(long timestamp, byte[] value, CompressionType type) {
+        this(timestamp, null, value, type);
     }
 
-    public Record(byte[] key, byte[] value) {
-        this(key, value, CompressionType.NONE);
+    public Record(long timestamp, byte[] key, byte[] value) {
+        this(timestamp, key, value, CompressionType.NONE);
     }
 
-    public Record(byte[] value) {
-        this(null, value, CompressionType.NONE);
+    public Record(long timestamp, byte[] value) {
+        this(timestamp, null, value, CompressionType.NONE);
     }
 
     // Write a record to the buffer, if the record's compression type is none, then
     // its value payload should be already compressed with the specified type
-    public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+    public static void write(ByteBuffer buffer, long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
         // construct the compressor with compression type none since this function will not do any
         //compression according to the input type, it will just write the record's payload as is
         Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity());
-        compressor.putRecord(key, value, type, valueOffset, valueSize);
+        compressor.putRecord(timestamp, key, value, type, valueOffset, valueSize);
     }
 
-    public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) {
+    public static void write(Compressor compressor, long crc, byte attributes, long timestamp, byte[] key, byte[] value, int valueOffset, int valueSize) {
         // write crc
         compressor.putInt((int) (crc & 0xffffffffL));
         // write magic value
         compressor.putByte(CURRENT_MAGIC_VALUE);
         // write attributes
         compressor.putByte(attributes);
+        // write timestamp
+        compressor.putLong(timestamp);
         // write the key
         if (key == null) {
             compressor.putInt(-1);
@@ -145,7 +181,7 @@ public final class Record {
     }
 
     public static int recordSize(int keySize, int valueSize) {
-        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
+        return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize;
     }
 
     public ByteBuffer buffer() {
@@ -171,13 +207,14 @@ public final class Record {
     /**
      * Compute the checksum of the record from the attributes, key and value payloads
      */
-    public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
+    public static long computeChecksum(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) {
         Crc32 crc = new Crc32();
         crc.update(CURRENT_MAGIC_VALUE);
         byte attributes = 0;
         if (type.id > 0)
             attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
         crc.update(attributes);
+        crc.updateLong(timestamp);
         // update for the key
         if (key == null) {
             crc.updateInt(-1);
@@ -240,7 +277,10 @@ public final class Record {
      * The length of the key in bytes
      */
     public int keySize() {
-        return buffer.getInt(KEY_SIZE_OFFSET);
+        if (magic() == MAGIC_VALUE_V0)
+            return buffer.getInt(KEY_SIZE_OFFSET_V0);
+        else
+            return buffer.getInt(KEY_SIZE_OFFSET_V1);
     }
 
     /**
@@ -254,7 +294,10 @@ public final class Record {
      * The position where the value size is stored
      */
     private int valueSizeOffset() {
-        return KEY_OFFSET + Math.max(0, keySize());
+        if (magic() == MAGIC_VALUE_V0)
+            return KEY_OFFSET_V0 + Math.max(0, keySize());
+        else
+            return KEY_OFFSET_V1 + Math.max(0, keySize());
     }
 
     /**
@@ -279,6 +322,35 @@ public final class Record {
     }
 
     /**
+     * When magic value is greater than 0, the timestamp of a record is determined in the following way:
+     * 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message.
+     * 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME
+     * 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME
+     */
+    public long timestamp() {
+        if (magic() == MAGIC_VALUE_V0)
+            return NO_TIMESTAMP;
+        else {
+            // case 2
+            if (wrapperRecordTimestampType == TimestampType.LOG_APPEND_TIME && wrapperRecordTimestamp != null)
+                return wrapperRecordTimestamp;
+            // Case 1, 3
+            else
+                return buffer.getLong(TIMESTAMP_OFFSET);
+        }
+    }
+
+    /**
+     * The timestamp of the message.
+     */
+    public TimestampType timestampType() {
+        if (magic() == 0)
+            return TimestampType.NO_TIMESTAMP_TYPE;
+        else
+            return wrapperRecordTimestampType == null ? TimestampType.getTimestampType(attributes()) : wrapperRecordTimestampType;
+    }
+
+    /**
      * The compression type used with this record
      */
     public CompressionType compressionType() {
@@ -296,7 +368,10 @@ public final class Record {
      * A ByteBuffer containing the message key
      */
     public ByteBuffer key() {
-        return sliceDelimited(KEY_SIZE_OFFSET);
+        if (magic() == MAGIC_VALUE_V0)
+            return sliceDelimited(KEY_SIZE_OFFSET_V0);
+        else
+            return sliceDelimited(KEY_SIZE_OFFSET_V1);
     }
 
     /**
@@ -317,13 +392,24 @@ public final class Record {
     }
 
     public String toString() {
-        return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
-                             magic(),
-                             attributes(),
-                             compressionType(),
-                             checksum(),
-                             key() == null ? 0 : key().limit(),
-                             value() == null ? 0 : value().limit());
+        if (magic() > 0)
+            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes)",
+                                 magic(),
+                                 attributes(),
+                                 compressionType(),
+                                 checksum(),
+                                 timestampType(),
+                                 timestamp(),
+                                 key() == null ? 0 : key().limit(),
+                                 value() == null ? 0 : value().limit());
+        else
+            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
+                                 magic(),
+                                 attributes(),
+                                 compressionType(),
+                                 checksum(),
+                                 key() == null ? 0 : key().limit(),
+                                 value() == null ? 0 : value().limit());
     }
 
     public boolean equals(Object other) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
new file mode 100644
index 0000000..ab12a35
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.util.NoSuchElementException;
+
+/**
+ * The timestamp type of the records.
+ */
+public enum TimestampType {
+    NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");
+
+    public final int value;
+    public final String name;
+    TimestampType(int value, String name) {
+        this.value = value;
+        this.name = name;
+    }
+
+    public static TimestampType getTimestampType(byte attributes) {
+        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
+        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
+    }
+
+    public static byte setTimestampType(byte attributes, TimestampType timestampType) {
+        return timestampType == CREATE_TIME ?
+                (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
+    }
+
+    public static TimestampType forName(String name) {
+        switch (name) {
+            case "NoTimestampType":
+                return NO_TIMESTAMP_TYPE;
+            case "CreateTime":
+                return CREATE_TIME;
+            case "LogAppendTime":
+                return LOG_APPEND_TIME;
+            default:
+                throw new NoSuchElementException("Invalid timestamp type " + name);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index a915247..c7d41e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
@@ -98,14 +99,15 @@ public class ProduceRequest extends AbstractRequest {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
 
         for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
-            responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET));
+            responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET, Record.NO_TIMESTAMP));
         }
 
         switch (versionId) {
             case 0:
                 return new ProduceResponse(responseMap);
             case 1:
-                return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME);
+            case 2:
+                return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME, versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index c213332..58175e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -52,6 +52,7 @@ public class ProduceResponse extends AbstractRequestResponse {
      */
 
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";
 
     private final Map<TopicPartition, PartitionResponse> responses;
     private final int throttleTime;
@@ -68,18 +69,33 @@ public class ProduceResponse extends AbstractRequestResponse {
     }
 
     /**
-     * Constructor for Version 1
+     * Constructor for the latest version
      * @param responses Produced data grouped by topic-partition
      * @param throttleTime Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime) {
-        super(new Struct(CURRENT_SCHEMA));
+        this(responses, throttleTime, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id));
+    }
+
+    /**
+     * Constructor for a specific version
+     * @param responses Produced data grouped by topic-partition
+     * @param throttleTime Time in milliseconds the response was throttled
+     * @param version the version of schema to use.
+     */
+    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime, int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version)));
         initCommonFields(responses);
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
         this.responses = responses;
         this.throttleTime = throttleTime;
     }
 
+    /**
+     * Constructor from a {@link Struct}. It is the caller's responsibility to pass in a struct with the latest schema.
+     * @param struct
+     */
     public ProduceResponse(Struct struct) {
         super(struct);
         responses = new HashMap<TopicPartition, PartitionResponse>();
@@ -91,8 +107,9 @@ public class ProduceResponse extends AbstractRequestResponse {
                 int partition = partRespStruct.getInt(PARTITION_KEY_NAME);
                 short errorCode = partRespStruct.getShort(ERROR_CODE_KEY_NAME);
                 long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
+                long timestamp = partRespStruct.getLong(TIMESTAMP_KEY_NAME);
                 TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(errorCode, offset));
+                responses.put(tp, new PartitionResponse(errorCode, offset, timestamp));
             }
         }
         this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
@@ -107,9 +124,12 @@ public class ProduceResponse extends AbstractRequestResponse {
             List<Struct> partitionArray = new ArrayList<Struct>();
             for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
                 PartitionResponse part = partitionEntry.getValue();
-                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME).set(PARTITION_KEY_NAME,
-                                                                                         partitionEntry.getKey()).set(
-                    ERROR_CODE_KEY_NAME, part.errorCode).set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+                Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
+                        .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+                        .set(ERROR_CODE_KEY_NAME, part.errorCode)
+                        .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
+                if (partStruct.hasField(TIMESTAMP_KEY_NAME))
+                        partStruct.set(TIMESTAMP_KEY_NAME, part.timestamp);
                 partitionArray.add(partStruct);
             }
             topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray());
@@ -129,10 +149,12 @@ public class ProduceResponse extends AbstractRequestResponse {
     public static final class PartitionResponse {
         public short errorCode;
         public long baseOffset;
+        public long timestamp;
 
-        public PartitionResponse(short errorCode, long baseOffset) {
+        public PartitionResponse(short errorCode, long baseOffset, long timestamp) {
             this.errorCode = errorCode;
             this.baseOffset = baseOffset;
+            this.timestamp = timestamp;
         }
 
         @Override
@@ -143,6 +165,8 @@ public class ProduceResponse extends AbstractRequestResponse {
             b.append(errorCode);
             b.append(",offset: ");
             b.append(baseOffset);
+            b.append(",timestamp: ");
+            b.append(timestamp);
             b.append('}');
             return b.toString();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index 5b86700..caa0058 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -131,6 +131,17 @@ public class Crc32 implements Checksum {
         update((byte) input /* >> 0 */);
     }
 
+    final public void updateLong(long input) {
+        update((byte) (input >> 56));
+        update((byte) (input >> 48));
+        update((byte) (input >> 40));
+        update((byte) (input >> 32));
+        update((byte) (input >> 24));
+        update((byte) (input >> 16));
+        update((byte) (input >> 8));
+        update((byte) input /* >> 0 */);
+    }
+
     /*
      * CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table.
      */