You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/04/29 02:40:44 UTC

[1/2] kafka git commit: KAFKA-4208; Add Record Headers

Repository: kafka
Updated Branches:
  refs/heads/trunk ca6ae8116 -> 6185bc027


http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 0f1cb54..af599ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -99,7 +99,7 @@ public class RecordAccumulatorTest {
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
-            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
             Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
             assertEquals(1, partitionBatches.size());
 
@@ -110,7 +110,7 @@ public class RecordAccumulatorTest {
 
         // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
 
-        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
         Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
@@ -136,7 +136,7 @@ public class RecordAccumulatorTest {
         byte[] value = new byte[2 * batchSize];
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
                 CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
-        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
         Deque<ProducerBatch> batches = accum.batches().get(tp1);
@@ -160,7 +160,7 @@ public class RecordAccumulatorTest {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
                 CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
-        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -183,7 +183,7 @@ public class RecordAccumulatorTest {
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, 0L, key, value, null, maxBlockTimeMs);
+                accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -205,7 +205,7 @@ public class RecordAccumulatorTest {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, null, maxBlockTimeMs);
+                            accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -249,7 +249,7 @@ public class RecordAccumulatorTest {
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -258,14 +258,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@@ -281,7 +281,7 @@ public class RecordAccumulatorTest {
                 CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
 
         long now = time.milliseconds();
-        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@@ -293,7 +293,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
 
@@ -318,7 +318,7 @@ public class RecordAccumulatorTest {
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
                 CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         
@@ -351,7 +351,7 @@ public class RecordAccumulatorTest {
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
                 CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null);
-        accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
+        accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
 
         accum.beginFlush();
         assertTrue(accum.flushInProgress());
@@ -379,7 +379,7 @@ public class RecordAccumulatorTest {
             }
         }
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, new TestCallback(), maxBlockTimeMs);
+            accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
@@ -404,11 +404,11 @@ public class RecordAccumulatorTest {
 
         // Test batches not in retry
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         }
         // Make the batches ready due to batch full
-        accum.append(tp1, 0L, key, value, null, 0);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
         // Advance the clock to expire the batch.
@@ -438,7 +438,7 @@ public class RecordAccumulatorTest {
 
         // Test batches in retry.
         // Create a retried batch
-        accum.append(tp1, 0L, key, value, null, 0);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
         time.sleep(lingerMs);
         readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
@@ -479,7 +479,7 @@ public class RecordAccumulatorTest {
                 if (exception instanceof TimeoutException) {
                     expiryCallbackCount.incrementAndGet();
                     try {
-                        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+                        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
                     } catch (InterruptedException e) {
                         throw new RuntimeException("Unexpected interruption", e);
                     }
@@ -489,7 +489,7 @@ public class RecordAccumulatorTest {
         };
 
         for (int i = 0; i < messagesPerBatch + 1; i++)
-            accum.append(tp1, 0L, key, value, callback, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs);
 
         assertEquals(2, accum.batches().get(tp1).size());
         assertTrue("First batch not full", accum.batches().get(tp1).peekFirst().isFull());
@@ -514,7 +514,7 @@ public class RecordAccumulatorTest {
                 CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+            accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
         time.sleep(2000);
@@ -549,7 +549,7 @@ public class RecordAccumulatorTest {
                 (short) 0, (short) 2))));
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
                 CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionManager());
-        accum.append(tp1, 0L, key, value, null, 0);
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 7de378d..934c895 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -96,7 +96,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -121,7 +121,7 @@ public class SenderTest {
         apiVersions.update("0", NodeApiVersions.create());
 
         Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
-                null, MAX_BLOCK_TIMEOUT).future;
+                null, null, MAX_BLOCK_TIMEOUT).future;
 
         // now the partition leader supports only v2
         apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
@@ -161,14 +161,14 @@ public class SenderTest {
         apiVersions.update("0", NodeApiVersions.create());
 
         Future<RecordMetadata> future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
-                null, MAX_BLOCK_TIMEOUT).future;
+                null, null, MAX_BLOCK_TIMEOUT).future;
 
         // now the partition leader supports only v2
         apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
                 new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
 
         Future<RecordMetadata> future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
-                null, MAX_BLOCK_TIMEOUT).future;
+                null, null, MAX_BLOCK_TIMEOUT).future;
 
         // start off support produce request v3
         apiVersions.update("0", NodeApiVersions.create());
@@ -212,7 +212,7 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
             sender.run(time.milliseconds()); // send produce request
             client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
             sender.run(time.milliseconds());
@@ -245,7 +245,7 @@ public class SenderTest {
                     apiVersions
             );
             // do a successful retry
-            Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
@@ -269,7 +269,7 @@ public class SenderTest {
             assertEquals(offset, future.get().offset());
 
             // do an unsuccessful retry
-            future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().destination());
@@ -309,7 +309,7 @@ public class SenderTest {
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
-            accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
@@ -321,7 +321,7 @@ public class SenderTest {
 
             time.sleep(900);
             // Now send another message to tp2
-            accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
 
             // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
             Cluster cluster2 = TestUtils.singletonCluster("test", 2);
@@ -344,7 +344,7 @@ public class SenderTest {
         long offset = 0;
         metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
 
-        Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
@@ -360,7 +360,7 @@ public class SenderTest {
         time.sleep(Metadata.TOPIC_EXPIRY_MS);
         metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
         assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
-        future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
@@ -416,7 +416,7 @@ public class SenderTest {
                 apiVersions
         );
 
-        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -469,7 +469,7 @@ public class SenderTest {
                 apiVersions
         );
 
-        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
         sender.run(time.milliseconds());  // send.
         String id = client.requests().peek().destination();
@@ -518,7 +518,7 @@ public class SenderTest {
                 apiVersions
         );
 
-        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
         sender.run(time.milliseconds());  // send.
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 7c5b2b5..a1efa58 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
@@ -166,7 +167,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+                                                                   "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -309,7 +310,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
 
@@ -365,7 +366,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -380,7 +381,7 @@ public class TransactionManagerTest {
         // In the mean time, the user does a second produce to a different partition
         transactionManager.maybeAddPartitionToTransaction(tp1);
         Future<RecordMetadata> secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
         prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -426,7 +427,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -460,7 +461,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
 
         FutureTransactionalResult commitResult = transactionManager.beginCommittingTransaction();
         assertFalse(responseFuture.isDone());

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
new file mode 100644
index 0000000..39c1c9c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.Test;
+
+public class RecordHeadersTest {
+
+    @Test
+    public void testAdd() {
+        Headers headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+
+        Header header = headers.iterator().next();
+        assertHeader("key", "value", header);
+
+        headers.add(new RecordHeader("key2", "value2".getBytes()));
+
+        assertHeader("key2", "value2", headers.lastHeader("key2"));
+        assertEquals(2, getCount(headers));
+    }
+
+    @Test
+    public void testRemove() {
+        Headers headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+
+        assertTrue(headers.iterator().hasNext());
+
+        headers.remove("key");
+
+        assertFalse(headers.iterator().hasNext());
+    }
+
+    @Test
+    public void testAddRemoveInterleaved() {
+        Headers headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+        headers.add(new RecordHeader("key2", "value2".getBytes()));
+
+        assertTrue(headers.iterator().hasNext());
+
+        headers.remove("key");
+
+        assertEquals(1, getCount(headers));
+
+        headers.add(new RecordHeader("key3", "value3".getBytes()));
+        
+        assertNull(headers.lastHeader("key"));
+
+        assertHeader("key2", "value2", headers.lastHeader("key2"));
+
+        assertHeader("key3", "value3", headers.lastHeader("key3"));
+
+        assertEquals(2, getCount(headers));
+
+        headers.remove("key2");
+
+        assertNull(headers.lastHeader("key"));
+
+        assertNull(headers.lastHeader("key2"));
+
+        assertHeader("key3", "value3", headers.lastHeader("key3"));
+
+        assertEquals(1, getCount(headers));
+
+        headers.add(new RecordHeader("key3", "value4".getBytes()));
+
+        assertHeader("key3", "value4", headers.lastHeader("key3"));
+
+        assertEquals(2, getCount(headers));
+
+        headers.add(new RecordHeader("key", "valueNew".getBytes()));
+
+        assertEquals(3, getCount(headers));
+
+
+        assertHeader("key", "valueNew", headers.lastHeader("key"));
+
+        headers.remove("key3");
+
+        assertEquals(1, getCount(headers));
+
+        assertNull(headers.lastHeader("key2"));
+
+        headers.remove("key");
+
+        assertFalse(headers.iterator().hasNext());
+    }
+
+    @Test
+    public void testLastHeader() {
+        Headers headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+        headers.add(new RecordHeader("key", "value2".getBytes()));
+        headers.add(new RecordHeader("key", "value3".getBytes()));
+
+        assertHeader("key", "value3", headers.lastHeader("key"));
+        assertEquals(3, getCount(headers));
+
+    }
+
+    @Test
+    public void testReadOnly() throws IOException {
+        RecordHeaders headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+        Iterator<Header> headerIteratorBeforeClose = headers.iterator();
+        headers.setReadOnly();
+        try {
+            headers.add(new RecordHeader("key", "value".getBytes()));
+            fail("IllegalStateException expected as headers are closed");
+        } catch (IllegalStateException ise) {
+            //expected  
+        }
+
+        try {
+            headers.remove("key");
+            fail("IllegalStateException expected as headers are closed");
+        } catch (IllegalStateException ise) {
+            //expected  
+        }
+
+        try {
+            Iterator<Header> headerIterator = headers.iterator();
+            headerIterator.next();
+            headerIterator.remove();
+            fail("IllegalStateException expected as headers are closed");
+        } catch (IllegalStateException ise) {
+            //expected  
+        }
+        
+        try {
+            headerIteratorBeforeClose.next();
+            headerIteratorBeforeClose.remove();
+            fail("IllegalStateException expected as headers are closed");
+        } catch (IllegalStateException ise) {
+            //expected  
+        }
+    }
+
+    @Test
+    public void testHeaders() throws IOException {
+        RecordHeaders headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+        headers.add(new RecordHeader("key1", "key1value".getBytes()));
+        headers.add(new RecordHeader("key", "value2".getBytes()));
+        headers.add(new RecordHeader("key2", "key2value".getBytes()));
+
+
+        Iterator<Header> keyHeaders = headers.headers("key").iterator();
+        assertHeader("key", "value", keyHeaders.next());
+        assertHeader("key", "value2", keyHeaders.next());
+        assertFalse(keyHeaders.hasNext());
+
+        keyHeaders = headers.headers("key1").iterator();
+        assertHeader("key1", "key1value", keyHeaders.next());
+        assertFalse(keyHeaders.hasNext());
+
+        keyHeaders = headers.headers("key2").iterator();
+        assertHeader("key2", "key2value", keyHeaders.next());
+        assertFalse(keyHeaders.hasNext());
+
+    }
+
+    @Test
+    public void testNew() throws IOException {
+        RecordHeaders headers = new RecordHeaders();
+        headers.add(new RecordHeader("key", "value".getBytes()));
+        headers.setReadOnly();
+
+        RecordHeaders newHeaders = new RecordHeaders(headers);
+        newHeaders.add(new RecordHeader("key", "value2".getBytes()));
+
+        //Ensure existing headers are not modified
+        assertHeader("key", "value", headers.lastHeader("key"));
+        assertEquals(1, getCount(headers));
+
+        //Ensure new headers are modified
+        assertHeader("key", "value2", newHeaders.lastHeader("key"));
+        assertEquals(2, getCount(newHeaders));
+    }
+
+    private int getCount(Headers headers) {
+        int count = 0;
+        Iterator<Header> headerIterator = headers.iterator();
+        while (headerIterator.hasNext()) {
+            headerIterator.next();
+            count++;
+        }
+        return count;
+    }
+    
+    static void assertHeader(String key, String value, Header actual) {
+        assertEquals(key, actual.key());
+        assertTrue(Arrays.equals(value.getBytes(), actual.value()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index a50c5b2..57f4663 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
@@ -90,8 +92,8 @@ public class DefaultRecordBatchTest {
     @Test
     public void testSizeInBytes() {
         Header[] headers = new Header[] {
-            new Header("foo", "value".getBytes()),
-            new Header("bar", Utils.wrapNullable(null))
+            new RecordHeader("foo", "value".getBytes()),
+            new RecordHeader("bar", (byte[]) null)
         };
 
         long timestamp = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 8502475..251db15 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -31,9 +32,9 @@ public class DefaultRecordTest {
     @Test
     public void testBasicSerde() {
         Header[] headers = new Header[] {
-            new Header("foo", "value".getBytes()),
-            new Header("bar", Utils.wrapNullable(null)),
-            new Header("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
+            new RecordHeader("foo", "value".getBytes()),
+            new RecordHeader("bar", (byte[]) null),
+            new RecordHeader("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
         };
 
         SimpleRecord[] records = new SimpleRecord[] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index c1ee7cd..b1a203f 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -26,6 +26,8 @@ import kafka.message.Message
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeaders
 
 /**
  * A base consumer used to abstract both old and new consumer
@@ -45,7 +47,8 @@ case class BaseConsumerRecord(topic: String,
                               timestamp: Long = Message.NoTimestamp,
                               timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE,
                               key: Array[Byte],
-                              value: Array[Byte])
+                              value: Array[Byte],
+                              headers: Headers = new RecordHeaders())
 
 class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -97,7 +100,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
                        record.timestamp,
                        record.timestampType,
                        record.key,
-                       record.value)
+                       record.value,
+                       record.headers)
   }
 
   override def stop() {
@@ -132,7 +136,8 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
                        messageAndMetadata.timestamp,
                        messageAndMetadata.timestampType,
                        messageAndMetadata.key,
-                       messageAndMetadata.message)
+                       messageAndMetadata.message, 
+                       new RecordHeaders())
   }
 
   override def stop() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 393fee6..6d27e85 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -138,7 +138,7 @@ object ConsoleConsumer extends Logging {
       messageCount += 1
       try {
         formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
-                                             msg.timestampType, 0, 0, 0, msg.key, msg.value), output)
+                                             msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
       } catch {
         case e: Throwable =>
           if (skipMessageOnError) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d55ed6c..b3a7978 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -43,6 +43,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
 import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
+import org.apache.kafka.common.header.internals.RecordHeaders
 import org.apache.kafka.common.record.RecordBatch
 
 /**
@@ -559,7 +560,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
                          messageAndMetadata.timestamp,
                          messageAndMetadata.timestampType,
                          messageAndMetadata.key,
-                         messageAndMetadata.message)
+                         messageAndMetadata.message,
+                         new RecordHeaders())
     }
 
     override def stop() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9ec544c..4a49833 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{MetricName, TopicPartition}
 import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
+import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.Assert._
@@ -37,6 +38,95 @@ import scala.collection.mutable.Buffer
 class PlaintextConsumerTest extends BaseConsumerTest {
 
   @Test
+  def testHeaders() {
+    val numRecords = 1
+    val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+    
+    record.headers().add(s"headerKey", s"headerValue".getBytes)
+    
+    this.producers.head.send(record)
+    
+    assertEquals(0, this.consumers.head.assignment.size)
+    this.consumers.head.assign(List(tp).asJava)
+    assertEquals(1, this.consumers.head.assignment.size)
+
+    this.consumers.head.seek(tp, 0)
+    val records = consumeRecords(consumer = this.consumers.head, numRecords = numRecords)
+
+    assertEquals(numRecords, records.size)
+
+    for (i <- 0 until numRecords) {
+      val record = records(i)
+      val header = record.headers().lastHeader(s"headerKey")
+      assertEquals(s"headerValue", if (header == null) null else new String(header.value()))
+    }
+  }
+  
+  @Test
+  def testHeadersExtendedSerializerDeserializer() {
+    val numRecords = 1
+    val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+
+    val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
+      
+      var serializer = new ByteArraySerializer()
+      
+      override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+        headers.add(s"content-type", s"application/octet-stream".getBytes)
+        serializer.serialize(topic, data)
+      }
+
+      override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
+      
+      override def close(): Unit = serializer.close()
+
+      override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
+        fail("method should not be invoked")
+        null
+      }
+    }
+
+
+    val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
+      
+      var deserializer = new ByteArrayDeserializer()
+      
+      override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+        var header = headers.lastHeader(s"content-type")
+        assertEquals(s"application/octet-stream", if (header == null) null else new String(header.value()))
+        deserializer.deserialize(topic, data)
+      }
+
+      override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs, isKey)
+
+
+      override def close(): Unit = deserializer.close()
+
+      override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
+        fail("method should not be invoked")
+        null
+      }
+
+    }
+    
+    val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer)
+    producers += producer0
+    producer0.send(record)
+
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), extendedDeserializer)
+    consumers += consumer0
+
+    assertEquals(0, consumer0.assignment.size)
+    consumer0.assign(List(tp).asJava)
+    assertEquals(1, consumer0.assignment.size)
+
+    consumer0.seek(tp, 0)
+    val records = consumeRecords(consumer = consumer0, numRecords = numRecords)
+
+    assertEquals(numRecords, records.size)
+  }
+  
+  @Test
   def testMaxPollRecords() {
     val maxPollRecords = 2
     val numRecords = 10000


[2/2] kafka git commit: KAFKA-4208; Add Record Headers

Posted by jg...@apache.org.
KAFKA-4208; Add Record Headers

As per KIP-82

Adding record headers api to ProducerRecord, ConsumerRecord
Support to convert from protocol to api added Kafka Producer, Kafka Fetcher (Consumer)
Updated MirrorMaker, ConsoleConsumer and scala BaseConsumer
Add RecordHeaders and RecordHeader implementation of the interfaces Headers and Header

Some bits using are reverted to being Java 7 compatible, for the moment until KIP-118 is implemented.

Author: Michael Andre Pearce <Mi...@me.com>

Reviewers: Radai Rosenblatt <ra...@gmail.com>, Jiangjie Qin <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2772 from michaelandrepearce/KIP-82


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6185bc02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6185bc02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6185bc02

Branch: refs/heads/trunk
Commit: 6185bc0276c03075022c30d3c36f7f5c09ef19c6
Parents: ca6ae81
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Fri Apr 28 19:17:57 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Apr 28 19:18:27 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   7 +
 .../kafka/clients/consumer/ConsumerRecord.java  |  44 +++-
 .../clients/consumer/internals/Fetcher.java     |  24 +-
 .../kafka/clients/producer/KafkaProducer.java   |  47 ++--
 .../kafka/clients/producer/MockProducer.java    |  19 +-
 .../kafka/clients/producer/ProducerRecord.java  |  58 ++++-
 .../producer/internals/ProducerBatch.java       |   7 +-
 .../producer/internals/RecordAccumulator.java   |  19 +-
 .../org/apache/kafka/common/header/Header.java  |  25 +++
 .../org/apache/kafka/common/header/Headers.java |  72 ++++++
 .../common/header/internals/RecordHeader.java   |  79 +++++++
 .../common/header/internals/RecordHeaders.java  | 207 +++++++++++++++++
 .../record/AbstractLegacyRecordBatch.java       |   1 +
 .../kafka/common/record/AbstractRecords.java    |   5 +-
 .../kafka/common/record/DefaultRecord.java      |  16 +-
 .../kafka/common/record/DefaultRecordBatch.java |   1 +
 .../org/apache/kafka/common/record/Header.java  |  64 ------
 .../common/record/MemoryRecordsBuilder.java     |  30 ++-
 .../org/apache/kafka/common/record/Record.java  |   2 +
 .../kafka/common/record/SimpleRecord.java       |   1 +
 .../common/serialization/Deserializer.java      |   2 +-
 .../serialization/ExtendedDeserializer.java     |  56 +++++
 .../serialization/ExtendedSerializer.java       |  55 +++++
 .../kafka/common/serialization/Serializer.java  |   1 -
 .../org/apache/kafka/common/utils/Utils.java    |   9 +
 .../clients/consumer/ConsumerRecordTest.java    |   2 +
 .../clients/consumer/internals/FetcherTest.java |  47 ++++
 .../clients/producer/KafkaProducerTest.java     |  61 +++++
 .../clients/producer/ProducerRecordTest.java    |   2 +-
 .../internals/RecordAccumulatorTest.java        |  42 ++--
 .../clients/producer/internals/SenderTest.java  |  28 +--
 .../internals/TransactionManagerTest.java       |  13 +-
 .../header/internals/RecordHeadersTest.java     | 224 +++++++++++++++++++
 .../common/record/DefaultRecordBatchTest.java   |   6 +-
 .../kafka/common/record/DefaultRecordTest.java  |   9 +-
 .../scala/kafka/consumer/BaseConsumer.scala     |  11 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |   2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |   4 +-
 .../kafka/api/PlaintextConsumerTest.scala       |  92 +++++++-
 40 files changed, 1216 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 394baf6..7ed584f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':clients') {
     include "**/org/apache/kafka/clients/producer/*"
     include "**/org/apache/kafka/common/*"
     include "**/org/apache/kafka/common/errors/*"
+    include "**/org/apache/kafka/common/header/*"
     include "**/org/apache/kafka/common/serialization/*"
     include "**/org/apache/kafka/common/config/*"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a6de9a7..d7851a5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -93,6 +93,7 @@
 
     <subpackage name="record">
       <allow pkg="net.jpountz" />
+      <allow pkg="org.apache.kafka.common.header" />
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.protocol" />
@@ -100,6 +101,11 @@
       <allow pkg="org.apache.kafka.common.errors" />
     </subpackage>
 
+    <subpackage name="header">
+      <allow pkg="org.apache.kafka.common.header" />
+      <allow pkg="org.apache.kafka.common.record" />
+    </subpackage>
+
     <subpackage name="requests">
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
@@ -110,6 +116,7 @@
 
     <subpackage name="serialization">
       <allow class="org.apache.kafka.common.errors.SerializationException" />
+      <allow class="org.apache.kafka.common.header.Headers" />
     </subpackage>
   </subpackage>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index fe9ede8..464091a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 
@@ -37,6 +39,7 @@ public class ConsumerRecord<K, V> {
     private final long checksum;
     private final int serializedKeySize;
     private final int serializedValueSize;
+    private final Headers headers;
     private final K key;
     private final V value;
 
@@ -62,7 +65,8 @@ public class ConsumerRecord<K, V> {
 
 
     /**
-     * Creates a record to be received from a specified topic and partition
+     * Creates a record to be received from a specified topic and partition (provided for
+     * compatibility with Kafka 0.10 before the message format supported headers).
      *
      * @param topic The topic this record is received from
      * @param partition The partition of the topic this record is received from
@@ -85,6 +89,35 @@ public class ConsumerRecord<K, V> {
                           int serializedValueSize,
                           K key,
                           V value) {
+        this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, value, new RecordHeaders());
+    }
+
+    /**
+     * Creates a record to be received from a specified topic and partition
+     *
+     * @param topic The topic this record is received from
+     * @param partition The partition of the topic this record is received from
+     * @param offset The offset of this record in the corresponding Kafka partition
+     * @param timestamp The timestamp of the record.
+     * @param timestampType The timestamp type
+     * @param checksum The checksum (CRC32) of the full record
+     * @param serializedKeySize The length of the serialized key
+     * @param serializedValueSize The length of the serialized value
+     * @param key The key of the record, if one exists (null is allowed)
+     * @param value The record contents
+     * @param headers The headers of the record.
+     */
+    public ConsumerRecord(String topic,
+                          int partition,
+                          long offset,
+                          long timestamp,
+                          TimestampType timestampType,
+                          long checksum,
+                          int serializedKeySize,
+                          int serializedValueSize,
+                          K key,
+                          V value,
+                          Headers headers) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
@@ -97,6 +130,7 @@ public class ConsumerRecord<K, V> {
         this.serializedValueSize = serializedValueSize;
         this.key = key;
         this.value = value;
+        this.headers = headers;
     }
 
     /**
@@ -114,6 +148,13 @@ public class ConsumerRecord<K, V> {
     }
 
     /**
+     * The headers
+     */
+    public Headers headers() {
+        return headers;
+    }
+    
+    /**
      * The key (or null if no key is specified)
      */
     public K key() {
@@ -177,6 +218,7 @@ public class ConsumerRecord<K, V> {
                + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
                + ", serialized key size = "  + serializedKeySize
                + ", serialized value size = " + serializedValueSize
+               + ", headers = " + headers
                + ", key = " + key + ", value = " + value + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 947214f..0c5c385 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -58,6 +60,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -103,8 +106,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+
+    private final ExtendedDeserializer<K> keyDeserializer;
+    private final ExtendedDeserializer<V> valueDeserializer;
     private final IsolationLevel isolationLevel;
 
     private PartitionRecords nextInLineRecords = null;
@@ -136,8 +140,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
+        this.keyDeserializer = ensureExtended(keyDeserializer);
+        this.valueDeserializer = ensureExtended(valueDeserializer);
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
@@ -146,6 +150,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         subscriptions.addListener(this);
     }
 
+    private <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T> deserializer) {
+        return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
+    }
+    
     /**
      * Represents data about an offset returned by a broker.
      */
@@ -894,18 +902,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             long offset = record.offset();
             long timestamp = record.timestamp();
             TimestampType timestampType = batch.timestampType();
+            Headers headers = new RecordHeaders(record.headers());
             ByteBuffer keyBytes = record.key();
             byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
             ByteBuffer valueBytes = record.value();
             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
-
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
                                         timestamp, timestampType, record.checksum(),
                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value);
+                                        key, value, headers);
         } catch (RuntimeException e) {
             throw new SerializationException("Error deserializing key/value for partition " + partition +
                     " at offset " + record.offset(), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e0d9938..deca51f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -40,6 +40,9 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -51,6 +54,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -156,8 +160,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final CompressionType compressionType;
     private final Sensor errors;
     private final Time time;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
+    private final ExtendedSerializer<K> keySerializer;
+    private final ExtendedSerializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
@@ -238,21 +242,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
-                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                        Serializer.class);
+                this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                                                                         Serializer.class));
                 this.keySerializer.configure(config.originals(), true);
             } else {
                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-                this.keySerializer = keySerializer;
+                this.keySerializer = ensureExtended(keySerializer);
             }
             if (valueSerializer == null) {
-                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                        Serializer.class);
+                this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                                                                           Serializer.class));
                 this.valueSerializer.configure(config.originals(), false);
             } else {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-                this.valueSerializer = valueSerializer;
+                this.valueSerializer = ensureExtended(valueSerializer);
             }
+            
 
             // load interceptors and make sure they get clientId
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
@@ -326,6 +331,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
+        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+    }
+
     private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
         /* check for user defined settings.
          * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
@@ -641,7 +650,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
             try {
-                serializedKey = keySerializer.serialize(record.topic(), record.key());
+                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
@@ -649,16 +658,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             byte[] serializedValue;
             try {
-                serializedValue = valueSerializer.serialize(record.topic(), record.value());
+                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
-
             int partition = partition(record, serializedKey, serializedValue, cluster);
+
+            setReadOnly(record.headers());
+            Header[] headers = record.headers().toArray();
+
             int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
-                    serializedKey, serializedValue);
+                    serializedKey, serializedValue, headers);
             ensureValidRecordSize(serializedSize);
             tp = new TopicPartition(record.topic(), partition);
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
@@ -670,7 +682,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 transactionManager.maybeAddPartitionToTransaction(tp);
 
             RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
-                    serializedValue, interceptCallback, remainingWaitMs);
+                    serializedValue, headers, interceptCallback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -723,7 +735,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
         if (transactionManager.isInTransaction()) {
             if (transactionManager.isInErrorState()) {
-                String errorMessage = "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
+                String errorMessage =
+                    "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
                 Exception lastError = transactionManager.lastError();
                 if (lastError != null)
                     throw new KafkaException(errorMessage, lastError);
@@ -735,6 +748,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    private void setReadOnly(Headers headers) {
+        if (headers instanceof RecordHeaders) {
+            ((RecordHeaders) headers).setReadOnly();
+        }
+    }
+
     /**
      * Wait for cluster metadata including partitions for the given topic to be available.
      * @param topic The topic we want metadata for

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a4f59ac..1b4151c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.ArrayDeque;
@@ -53,9 +54,9 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private final Deque<Completion> completions;
     private boolean autoComplete;
     private Map<TopicPartition, Long> offsets;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
     private boolean closed;
+    private final ExtendedSerializer<K> keySerializer;
+    private final ExtendedSerializer<V> valueSerializer;
 
     /**
      * Create a mock producer
@@ -77,8 +78,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.partitioner = partitioner;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
+        this.keySerializer = ensureExtended(keySerializer);
+        this.valueSerializer = ensureExtended(valueSerializer);
         this.offsets = new HashMap<TopicPartition, Long>();
         this.sent = new ArrayList<ProducerRecord<K, V>>();
         this.completions = new ArrayDeque<Completion>();
@@ -134,7 +135,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     public void abortTransaction() throws ProducerFencedException {
-
+        
+    }
+        
+    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
+        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
     }
 
     /**
@@ -273,8 +278,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
                                                    + "].");
             return partition;
         }
-        byte[] keyBytes = keySerializer.serialize(topic, record.key());
-        byte[] valueBytes = valueSerializer.serialize(topic, record.value());
+        byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
+        byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value());
         return this.partitioner.partition(topic, record.key(), keyBytes, record.value(), valueBytes, cluster);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index df89616..85428e5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
 /**
  * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
  * partition number, and an optional key and value.
@@ -44,6 +48,7 @@ public class ProducerRecord<K, V> {
 
     private final String topic;
     private final Integer partition;
+    private final Headers headers;
     private final K key;
     private final V value;
     private final Long timestamp;
@@ -56,8 +61,9 @@ public class ProducerRecord<K, V> {
      * @param timestamp The timestamp of the record
      * @param key The key that will be included in the record
      * @param value The record contents
+     * @param headers the headers that will be included in the record
      */
-    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
+    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null.");
         if (timestamp != null && timestamp < 0)
@@ -71,9 +77,36 @@ public class ProducerRecord<K, V> {
         this.key = key;
         this.value = value;
         this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);
     }
 
     /**
+     * Creates a record with a specified timestamp to be sent to a specified topic and partition
+     *
+     * @param topic The topic the record will be appended to
+     * @param partition The partition to which the record should be sent
+     * @param timestamp The timestamp of the record
+     * @param key The key that will be included in the record
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
+        this(topic, partition, timestamp, key, value, null);
+    }
+
+    /**
+     * Creates a record to be sent to a specified topic and partition
+     *
+     * @param topic The topic the record will be appended to
+     * @param partition The partition to which the record should be sent
+     * @param key The key that will be included in the record
+     * @param value The record contents
+     * @param headers The headers that will be included in the record
+     */
+    public ProducerRecord(String topic, Integer partition, K key, V value,  Iterable<Header> headers) {
+        this(topic, partition, null, key, value, headers);
+    }
+    
+    /**
      * Creates a record to be sent to a specified topic and partition
      *
      * @param topic The topic the record will be appended to
@@ -82,9 +115,9 @@ public class ProducerRecord<K, V> {
      * @param value The record contents
      */
     public ProducerRecord(String topic, Integer partition, K key, V value) {
-        this(topic, partition, null, key, value);
+        this(topic, partition, null, key, value, null);
     }
-
+    
     /**
      * Create a record to be sent to Kafka
      * 
@@ -93,9 +126,9 @@ public class ProducerRecord<K, V> {
      * @param value The record contents
      */
     public ProducerRecord(String topic, K key, V value) {
-        this(topic, null, null, key, value);
+        this(topic, null, null, key, value, null);
     }
-
+    
     /**
      * Create a record with no key
      * 
@@ -103,7 +136,7 @@ public class ProducerRecord<K, V> {
      * @param value The record contents
      */
     public ProducerRecord(String topic, V value) {
-        this(topic, null, null, null, value);
+        this(topic, null, null, null, value, null);
     }
 
     /**
@@ -114,6 +147,13 @@ public class ProducerRecord<K, V> {
     }
 
     /**
+     * @return The headers
+     */
+    public Headers headers() {
+        return headers;
+    }
+
+    /**
      * @return The key (or null if no key is specified)
      */
     public K key() {
@@ -143,10 +183,11 @@ public class ProducerRecord<K, V> {
 
     @Override
     public String toString() {
+        String headers = this.headers == null ? "null" : this.headers.toString();
         String key = this.key == null ? "null" : this.key.toString();
         String value = this.value == null ? "null" : this.value.toString();
         String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
-        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value +
+        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", headers=" + headers + ", key=" + key + ", value=" + value +
             ", timestamp=" + timestamp + ")";
     }
 
@@ -165,6 +206,8 @@ public class ProducerRecord<K, V> {
             return false;
         else if (topic != null ? !topic.equals(that.topic) : that.topic != null) 
             return false;
+        else if (headers != null ? !headers.equals(that.headers) : that.headers != null)
+            return false;
         else if (value != null ? !value.equals(that.value) : that.value != null) 
             return false;
         else if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
@@ -177,6 +220,7 @@ public class ProducerRecord<K, V> {
     public int hashCode() {
         int result = topic != null ? topic.hashCode() : 0;
         result = 31 * result + (partition != null ? partition.hashCode() : 0);
+        result = 31 * result + (headers != null ? headers.hashCode() : 0);
         result = 31 * result + (key != null ? key.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
         result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index eba3078..6d5ca15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -75,12 +76,12 @@ public final class ProducerBatch {
      *
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
+    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
         if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
             return null;
         } else {
-            long checksum = this.recordsBuilder.append(timestamp, key, value);
-            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value));
+            long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
+            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                    timestamp, checksum,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index aa4d9d3..dcbf691 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -31,9 +32,10 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
@@ -168,6 +170,7 @@ public final class RecordAccumulator {
      * @param timestamp The timestamp of the record
      * @param key The key for the record
      * @param value The value for the record
+     * @param headers the Headers for the record
      * @param callback The user-supplied callback to execute when the request is complete
      * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
      */
@@ -175,26 +178,28 @@ public final class RecordAccumulator {
                                      long timestamp,
                                      byte[] key,
                                      byte[] value,
+                                     Header[] headers,
                                      Callback callback,
                                      long maxTimeToBlock) throws InterruptedException {
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
         ByteBuffer buffer = null;
+        if (headers == null) headers = Record.EMPTY_HEADERS;
         try {
             // check if we have an in-progress batch
             Deque<ProducerBatch> dq = getOrCreateDeque(tp);
             synchronized (dq) {
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                 if (appendResult != null)
                     return appendResult;
             }
 
             // we don't have an in-progress record batch try to allocate a new batch
             byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
-            int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value));
+            int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value, headers));
             log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
             buffer = free.allocate(size, maxTimeToBlock);
             synchronized (dq) {
@@ -202,7 +207,7 @@ public final class RecordAccumulator {
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
 
-                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                 if (appendResult != null) {
                     // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                     return appendResult;
@@ -210,7 +215,7 @@ public final class RecordAccumulator {
 
                 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
-                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
@@ -246,10 +251,10 @@ public final class RecordAccumulator {
      *  and memory records built) in one of the following cases (whichever comes first): right before send,
      *  if it is expired, or when the producer is closed.
      */
-    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
+    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
-            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
+            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
             if (future == null)
                 last.closeForRecordAppends();
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/header/Header.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/header/Header.java b/clients/src/main/java/org/apache/kafka/common/header/Header.java
new file mode 100644
index 0000000..58869b4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/Header.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header;
+
+public interface Header {
+   
+    String key();
+
+    byte[] value();
+   
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/header/Headers.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/header/Headers.java b/clients/src/main/java/org/apache/kafka/common/header/Headers.java
new file mode 100644
index 0000000..1796d62
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/Headers.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header;
+
+public interface Headers extends Iterable<Header> {
+    
+    /**
+     * Adds a header (key inside), to the end, returning if the operation succeeded.
+     * 
+     * @param header the Header to be added
+     * @return this instance of the Headers, once the header is added.
+     * @throws IllegalStateException is thrown if headers are in a read-only state.
+     */
+    Headers add(Header header) throws IllegalStateException;
+
+    /**
+     * Creates and adds a header, to the end, returning if the operation succeeded.
+     *
+     * @param key of the header to be added.
+     * @param value of the header to be added.
+     * @return this instance of the Headers, once the header is added.
+     * @throws IllegalStateException is thrown if headers are in a read-only state.
+     */
+    Headers add(String key, byte[] value) throws IllegalStateException;
+
+    /**
+     * Removes all headers for the given key returning if the operation succeeded.
+     * 
+     * @param key to remove all headers for.
+     * @return this instance of the Headers, once the header is added.
+     * @throws IllegalStateException is thrown if headers are in a read-only state.
+     */
+    Headers remove(String key) throws IllegalStateException;
+
+    /**
+     * Returns just one (the very last) header for the given key, if present.
+     * 
+     * @param key to get the last header for.
+     * @return this last header matching the given key, returns none if not present.
+     */
+    Header lastHeader(String key);
+
+    /**
+     * Returns all headers for the given key, in the order they were added in, if present.
+     *
+     * @param key to return the headers for.
+     * @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned. 
+     */
+    Iterable<Header> headers(String key);
+
+    /**
+     * Returns all headers as an array, in the order they were added in.
+     *
+     * @return the headers as a Header[], mutating this array will not affect the Headers, if NO headers are present an empty array is returned.
+     */
+    Header[] toArray();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
new file mode 100644
index 0000000..a6c5375
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Utils;
+
+public class RecordHeader implements Header {
+    private final String key;
+    private ByteBuffer valueBuffer;
+    private byte[] value;
+
+    public RecordHeader(String key, byte[] value) {
+        Objects.requireNonNull(key, "Null header keys are not permitted");
+        this.key = key;
+        this.value = value;
+    }
+
+    public RecordHeader(String key, ByteBuffer valueBuffer) {
+        Objects.requireNonNull(key, "Null header keys are not permitted");
+        this.key = key;
+        this.valueBuffer = valueBuffer;
+    }
+    
+    public String key() {
+        return key;
+    }
+
+    public byte[] value() {
+        if (value == null && valueBuffer != null) {
+            value = Utils.toArray(valueBuffer);
+            valueBuffer = null;
+        }
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        RecordHeader header = (RecordHeader) o;
+        return (key == null ? header.key == null : key.equals(header.key)) && 
+               Arrays.equals(value(), header.value());
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + Arrays.hashCode(value());
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "RecordHeader(key = " + key + ", value = " + Arrays.toString(value()) + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
new file mode 100644
index 0000000..f23d799
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header.internals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.AbstractIterator;
+
+public class RecordHeaders implements Headers {
+    
+    private final List<Header> headers;
+    private volatile boolean isReadOnly = false;
+
+    public RecordHeaders() {
+        this((Iterable<Header>) null);
+    }
+
+    public RecordHeaders(Header[] headers) {
+        if (headers == null) {
+            this.headers = new ArrayList<>();
+        } else {
+            this.headers = new ArrayList<>(Arrays.asList(headers));
+        }
+    }
+    
+    public RecordHeaders(Iterable<Header> headers) {
+        //Use efficient copy constructor if possible, fallback to iteration otherwise
+        if (headers == null) {
+            this.headers = new ArrayList<>();
+        } else if (headers instanceof RecordHeaders) {
+            this.headers = new ArrayList<>(((RecordHeaders) headers).headers);
+        } else if (headers instanceof Collection) {
+            this.headers = new ArrayList<>((Collection<Header>) headers);
+        } else {
+            this.headers = new ArrayList<>();
+            Iterator<Header> iterator = headers.iterator();
+            while (iterator.hasNext()) {
+                this.headers.add(iterator.next());
+            }
+        }
+    }
+
+    @Override
+    public Headers add(Header header) throws IllegalStateException {
+        canWrite();
+        headers.add(header);
+        return this;
+    }
+
+    @Override
+    public Headers add(String key, byte[] value) throws IllegalStateException {
+        return add(new RecordHeader(key, value));
+    }
+
+    @Override
+    public Headers remove(String key) throws IllegalStateException {
+        canWrite();
+        checkKey(key);
+        Iterator<Header> iterator = iterator();
+        while (iterator.hasNext()) {
+            if (iterator.next().key().equals(key)) {
+                iterator.remove();
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public Header lastHeader(String key) {
+        checkKey(key);
+        for (int i = headers.size() - 1; i >= 0; i--) {
+            Header header = headers.get(i);
+            if (header.key().equals(key)) {
+                return header;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Iterable<Header> headers(final String key) {
+        checkKey(key);
+        return new Iterable<Header>() {
+            @Override
+            public Iterator<Header> iterator() {
+                return new FilterByKeyIterator(headers.iterator(), key);
+            }
+        };
+    }
+
+    @Override
+    public Iterator<Header> iterator() {
+        return closeAware(headers.iterator());
+    }
+
+    public void setReadOnly() {
+        this.isReadOnly = true;
+    }
+
+    public Header[] toArray() {
+        return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]);
+    }
+    
+    private void checkKey(String key) {
+        if (key == null) {
+            throw new IllegalArgumentException("key cannot be null.");
+        }
+    }
+    
+    private void canWrite() {
+        if (isReadOnly) {
+            throw new IllegalStateException("RecordHeaders has been closed.");
+        }
+    }
+
+    private Iterator<Header> closeAware(final Iterator<Header> original) {
+        return new Iterator<Header>() {
+            @Override
+            public boolean hasNext() {
+                return original.hasNext();
+            }
+
+            public Header next() {
+                return original.next();
+            }
+
+            @Override
+            public void remove() {
+                canWrite();
+                original.remove();
+            }
+        };
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RecordHeaders headers1 = (RecordHeaders) o;
+
+        return headers != null ? headers.equals(headers1.headers) : headers1.headers == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return headers != null ? headers.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "RecordHeaders(" +
+               "headers = " + headers +
+               ", isReadOnly = " + isReadOnly +
+               ')';
+    }
+    
+    private static final class FilterByKeyIterator extends AbstractIterator<Header> {
+
+        private final Iterator<Header> original;
+        private final String key;
+
+        private FilterByKeyIterator(Iterator<Header> original, String key) {
+            this.original = original;
+            this.key = key;
+        }
+        
+        protected Header makeNext() {
+            while (true) {
+                if (original.hasNext()) {
+                    Header header = original.next();
+                    if (!header.key().equals(key)) {
+                        continue;
+                    }
+
+                    return header;
+                }
+                return this.allDone();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index ddb2bc7..85fcb2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index decc06c..87df7e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Utils;
 
@@ -164,9 +165,9 @@ public abstract class AbstractRecords implements Records {
         return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
     }
 
-    public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value) {
+    public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) {
         if (magic >= RecordBatch.MAGIC_VALUE_V2)
-            return DefaultRecordBatch.batchSizeUpperBound(key, value, Record.EMPTY_HEADERS);
+            return DefaultRecordBatch.batchSizeUpperBound(key, value, headers);
         else
             return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index a4b1d11..e0794d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Checksums;
@@ -223,13 +225,12 @@ public class DefaultRecord implements Record {
             ByteUtils.writeVarint(utf8Bytes.length, out);
             out.write(utf8Bytes);
 
-            ByteBuffer headerValue = header.value();
+            byte[] headerValue = header.value();
             if (headerValue == null) {
                 ByteUtils.writeVarint(-1, out);
             } else {
-                int headerValueSize = headerValue.remaining();
-                ByteUtils.writeVarint(headerValueSize, out);
-                Utils.writeTo(out, headerValue, headerValueSize);
+                ByteUtils.writeVarint(headerValue.length, out);
+                out.write(headerValue);
             }
         }
 
@@ -414,7 +415,7 @@ public class DefaultRecord implements Record {
                 buffer.position(buffer.position() + headerValueSize);
             }
 
-            headers[i] = new Header(headerKey, headerValue);
+            headers[i] = new RecordHeader(headerKey, headerValue);
         }
 
         return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
@@ -480,12 +481,11 @@ public class DefaultRecord implements Record {
             int headerKeySize = Utils.utf8Length(headerKey);
             size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;
 
-            ByteBuffer headerValue = header.value();
+            byte[] headerValue = header.value();
             if (headerValue == null) {
                 size += NULL_VARINT_SIZE_BYTES;
             } else {
-                int headerValueSize = headerValue.remaining();
-                size += ByteUtils.sizeOfVarint(headerValueSize) + headerValueSize;
+                size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length;
             }
         }
         return size;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 2680f30..93cd2eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.CloseableIterator;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/Header.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Header.java b/clients/src/main/java/org/apache/kafka/common/record/Header.java
deleted file mode 100644
index 2ca077c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/Header.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.utils.Utils;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class Header {
-    private final String key;
-    private final ByteBuffer value;
-
-    public Header(String key, ByteBuffer value) {
-        Objects.requireNonNull(key, "Null header keys are not permitted");
-        this.key = key;
-        this.value = value;
-    }
-
-    public Header(String key, byte[] value) {
-        this(key, Utils.wrapNullable(value));
-    }
-
-    public String key() {
-        return key;
-    }
-
-    public ByteBuffer value() {
-        return value == null ? null : value.duplicate();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        Header header = (Header) o;
-        return (key == null ? header.key == null : key.equals(header.key)) &&
-                (value == null ? header.value == null : value.equals(header.value));
-    }
-
-    @Override
-    public int hashCode() {
-        int result = key != null ? key.hashCode() : 0;
-        result = 31 * result + (value != null ? value.hashCode() : 0);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index eb52134..b9d65a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
@@ -412,6 +413,7 @@ public class MemoryRecordsBuilder {
         return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
+
     /**
      * Append a new record at the next sequential offset.
      * @param timestamp The record timestamp
@@ -420,7 +422,19 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
-        return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, Record.EMPTY_HEADERS);
+        return append(timestamp, key, value, Record.EMPTY_HEADERS);
+    }
+    
+    /**
+     * Append a new record at the next sequential offset.
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @param headers The record headers if there are any
+     * @return crc of the record
+     */
+    public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+        return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, headers);
     }
 
     /**
@@ -431,7 +445,19 @@ public class MemoryRecordsBuilder {
      * @return crc of the record
      */
     public long append(long timestamp, byte[] key, byte[] value) {
-        return append(timestamp, wrapNullable(key), wrapNullable(value));
+        return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+    }
+
+    /**
+     * Append a new record at the next sequential offset.
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @param headers The record headers if there are any
+     * @return crc of the record
+     */
+    public long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
+        return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 437ee3b..fdf41b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.record;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kafka.common.header.Header;
+
 /**
  * A log record is a tuple consisting of a unique offset in the log, a sequence number assigned by
  * the producer, a timestamp, a key and a value.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
index 0a5cbcf..fd361c4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 383f6e3..53c3ba2 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -35,7 +35,7 @@ public interface Deserializer<T> extends Closeable {
      * @param isKey whether is for key or value
      */
     public void configure(Map<String, ?> configs, boolean isKey);
-    
+
     /**
      * Deserialize a record value from a bytearray into a value or object.
      * @param topic topic associated with the data

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
new file mode 100644
index 0000000..5de154a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.header.Headers;
+
+public interface ExtendedDeserializer<T> extends Deserializer<T> {
+    
+    T deserialize(String topic, Headers headers, byte[] data);
+
+    class Wrapper<T> implements ExtendedDeserializer<T> {
+
+        private final Deserializer<T> deserializer;
+
+        public Wrapper(Deserializer<T> deserializer) {
+            this.deserializer = deserializer;
+        }
+
+
+        @Override
+        public T deserialize(String topic, Headers headers, byte[] data) {
+            return deserialize(topic, data);
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            deserializer.configure(configs, isKey);
+        }
+
+        @Override
+        public T deserialize(String topic, byte[] data) {
+            return deserializer.deserialize(topic, data);
+        }
+
+        @Override
+        public void close() {
+            deserializer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
new file mode 100644
index 0000000..8740631
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.header.Headers;
+
+public interface ExtendedSerializer<T> extends Serializer<T> {
+
+    byte[] serialize(String topic, Headers headers, T data);
+
+    class Wrapper<T> implements ExtendedSerializer<T> {
+
+        private final Serializer<T> serializer;
+
+        public Wrapper(Serializer<T> serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public byte[] serialize(String topic, Headers headers, T data) {
+            return serialize(topic, data);
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            serializer.configure(configs, isKey);
+        }
+
+        @Override
+        public byte[] serialize(String topic, T data) {
+            return serializer.serialize(topic, data);
+        }
+
+        @Override
+        public void close() {
+            serializer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 233a658..43d234c 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -43,7 +43,6 @@ public interface Serializer<T> extends Closeable {
      */
     public byte[] serialize(String topic, T data);
 
-
     /**
      * Close this serializer.
      * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a7d2a1b..1a4de98 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -186,6 +186,15 @@ public class Utils {
     }
 
     /**
+     * Read a byte array from its current position given the size in the buffer
+     * @param buffer The buffer to read from
+     * @param size The number of bytes to read into the array
+     */
+    public static byte[] toArray(ByteBuffer buffer, int size) {
+        return toArray(buffer, 0, size);
+    }
+
+    /**
      * Convert a ByteBuffer to a nullable array.
      * @param buffer The buffer to convert
      * @return The resulting array or null if the buffer is null

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index 9e62233..a8a5283 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
@@ -42,6 +43,7 @@ public class ConsumerRecordTest {
         assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
         assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
         assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
+        assertEquals(new RecordHeaders(), record.headers());
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b41e6ac..f0dd09c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -36,12 +36,14 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.LegacyRecord;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -76,6 +78,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -336,6 +339,50 @@ public class FetcherTest {
     }
 
     @Test
+    public void testHeaders() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time));
+        
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+        builder.append(0L, "key".getBytes(), "value-1".getBytes());
+
+        Header[] headersArray = new Header[1];
+        headersArray[0] = new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
+        builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
+
+        Header[] headersArray2 = new Header[2];
+        headersArray2[0] = new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
+        headersArray2[1] = new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8));
+        builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
+
+        MemoryRecords memoryRecords = builder.build();
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.seek(tp1, 1);
+
+        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(memoryRecords, Errors.NONE, 100L, 0));
+
+        assertEquals(1, fetcher.sendFetches());
+        consumerClient.poll(0);
+        records = fetcher.fetchedRecords().get(tp1);
+        
+        assertEquals(3, records.size());
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = records.iterator();
+        
+        ConsumerRecord<byte[], byte[]> record = recordIterator.next();
+        assertNull(record.headers().lastHeader("headerKey"));
+        
+        record = recordIterator.next();
+        assertEquals("headerValue", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
+        assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
+
+        record = recordIterator.next();
+        assertEquals("headerValue2", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
+        assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
+    }
+
+    @Test
     public void testFetchMaxPollRecords() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 819f15e..514426d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -25,8 +25,10 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -51,6 +53,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -347,5 +350,63 @@ public class KafkaProducerTest {
         }
         Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
     }
+    
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testHeaders() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        ExtendedSerializer keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+        ExtendedSerializer valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
+        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
+        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+
+        String topic = "topic";
+        Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
+
+        final Cluster cluster = new Cluster(
+                "dummy",
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet());
+
+
+        EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes();
+
+        PowerMock.replay(metadata);
+
+        String value = "value";
+
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
+        EasyMock.expect(keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once();
+        EasyMock.expect(valueSerializer.serialize(topic, record.headers(), value)).andReturn(value.getBytes()).once();
+
+        PowerMock.replay(keySerializer);
+        PowerMock.replay(valueSerializer);
+
+
+        //ensure headers can be mutated pre send.
+        record.headers().add(new RecordHeader("test", "header2".getBytes()));
+        
+        producer.send(record, null);
+        
+        //ensure headers are closed and cannot be mutated post send
+        try {
+            record.headers().add(new RecordHeader("test", "test".getBytes()));
+            fail("Expected IllegalStateException to be raised");
+        } catch (IllegalStateException ise) {
+            //expected
+        }
+        
+        //ensure existing headers are not changed, and last header for key is still original value
+        assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
+
+        PowerMock.verify(valueSerializer);
+        PowerMock.verify(keySerializer);
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6185bc02/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index b5a7a60..dc3c898 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -46,7 +46,7 @@ public class ProducerRecordTest {
         ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
-        ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null);
+        ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null, null);
         assertEquals(nullFieldsRecord, nullFieldsRecord);
         assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
     }