You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/18 18:01:30 UTC

kafka git commit: MINOR: Use `Record` instead of `ByteBufferMessageSet` in `ProduceRequestTest`

Repository: kafka
Updated Branches:
  refs/heads/trunk 2bd7b6450 -> c36cc60f7


MINOR: Use `Record` instead of `ByteBufferMessageSet` in `ProduceRequestTest`

We want to phase out `ByteBufferMessageSet` eventually, so new code should favour `Record` where possible.

Also use a fixed timestamp in `testCorruptLz4ProduceRequest` to ensure that
the checksum is always the same.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #1357 from ijuma/produce-request-test-improvement


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36cc60f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36cc60f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36cc60f

Branch: refs/heads/trunk
Commit: c36cc60f73ca1fe956fb8792bc538b1fdebb712d
Parents: 2bd7b64
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 18 11:01:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 18 11:01:27 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   | 24 ++++++++++-
 .../unit/kafka/server/ProduceRequestTest.scala  | 45 +++++++++++++-------
 2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 1bfe578..742d14f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -31,9 +32,12 @@ import java.util.Random;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.Utils;
 
-
 /**
  * Helper functions for writing unit tests
  */
@@ -141,4 +145,22 @@ public class TestUtils {
         return file;
     }
 
+    /**
+     * Create a records buffer including the offset and message size at the start, which is required if the buffer is to
+     * be sent as part of `ProduceRequest`. This is the reason why we can't use
+     * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this
+     * constructor does not include either of these fields.
+     */
+    public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType, Record... records) {
+        int bufferSize = 0;
+        for (Record record : records)
+            bufferSize += Records.LOG_OVERHEAD + record.size();
+        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+        MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
+        for (Record record : records)
+            memoryRecords.append(offset, record);
+        memoryRecords.close();
+        return memoryRecords.buffer();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 67f7d41..f80fa7d 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,10 +17,13 @@
 
 package kafka.server
 
-import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
+import java.nio.ByteBuffer
+
 import kafka.utils.TestUtils
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.record.{CompressionType, Record}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.junit.Assert._
 import org.junit.Test
@@ -36,17 +39,26 @@ class ProduceRequestTest extends BaseRequestTest {
   @Test
   def testSimpleProduceRequest() {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
-    val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
-      System.currentTimeMillis(), 1: Byte)).buffer
-    val topicPartition = new TopicPartition("topic", partition)
-    val partitionRecords = Map(topicPartition -> messageBuffer)
-    val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
-    assertEquals(1, produceResponse.responses.size)
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
-    assertEquals(topicPartition, tp)
-    assertEquals(Errors.NONE.code, partitionResponse.errorCode)
-    assertEquals(0, partitionResponse.baseOffset)
-    assertEquals(-1, partitionResponse.timestamp)
+
+    def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+      val topicPartition = new TopicPartition("topic", partition)
+      val partitionRecords = Map(topicPartition -> recordBuffer)
+      val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+      assertEquals(1, produceResponse.responses.size)
+      val (tp, partitionResponse) = produceResponse.responses.asScala.head
+      assertEquals(topicPartition, tp)
+      assertEquals(Errors.NONE.code, partitionResponse.errorCode)
+      assertEquals(expectedOffset, partitionResponse.baseOffset)
+      assertEquals(-1, partitionResponse.timestamp)
+      partitionResponse
+    }
+
+    sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
+      new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
+
+    sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
+      new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
+      new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
   }
 
   /* returns a pair of partition id and leader id */
@@ -60,12 +72,13 @@ class ProduceRequestTest extends BaseRequestTest {
   @Test
   def testCorruptLz4ProduceRequest() {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
-    val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
-      System.currentTimeMillis(), 1: Byte)).buffer
+    val timestamp = 1000000
+    val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
+      new Record(timestamp, "key".getBytes, "value".getBytes))
     // Change the lz4 checksum value so that it doesn't match the contents
-    messageBuffer.array.update(40, 0)
+    recordBuffer.array.update(40, 0)
     val topicPartition = new TopicPartition("topic", partition)
-    val partitionRecords = Map(topicPartition -> messageBuffer)
+    val partitionRecords = Map(topicPartition -> recordBuffer)
     val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head