You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/18 18:01:30 UTC
kafka git commit: MINOR: Use `Record` instead of
`ByteBufferMessageSet` in `ProduceRequestTest`
Repository: kafka
Updated Branches:
refs/heads/trunk 2bd7b6450 -> c36cc60f7
MINOR: Use `Record` instead of `ByteBufferMessageSet` in `ProduceRequestTest`
We want to phase out `ByteBufferMessageSet` eventually, so new code should favour `Record` where possible.
Also use a fixed timestamp in `testCorruptLz4ProduceRequest` to ensure that
the checksum is always the same.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson, Guozhang Wang
Closes #1357 from ijuma/produce-request-test-improvement
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36cc60f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36cc60f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36cc60f
Branch: refs/heads/trunk
Commit: c36cc60f73ca1fe956fb8792bc538b1fdebb712d
Parents: 2bd7b64
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 18 11:01:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 18 11:01:27 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/test/TestUtils.java | 24 ++++++++++-
.../unit/kafka/server/ProduceRequestTest.scala | 45 +++++++++++++-------
2 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 1bfe578..742d14f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -31,9 +32,12 @@ import java.util.Random;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
-
/**
* Helper functions for writing unit tests
*/
@@ -141,4 +145,22 @@ public class TestUtils {
return file;
}
+ /**
+ * Create a records buffer including the offset and message size at the start, which is required if the buffer is to
+ * be sent as part of `ProduceRequest`. This is the reason why we can't use
+ * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this
+ * constructor does not include either of these fields.
+ */
+ public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType, Record... records) {
+ int bufferSize = 0;
+ for (Record record : records)
+ bufferSize += Records.LOG_OVERHEAD + record.size();
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType);
+ for (Record record : records)
+ memoryRecords.append(offset, record);
+ memoryRecords.close();
+ return memoryRecords.buffer();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 67f7d41..f80fa7d 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,10 +17,13 @@
package kafka.server
-import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message}
+import java.nio.ByteBuffer
+
import kafka.utils.TestUtils
+import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.record.{CompressionType, Record}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@@ -36,17 +39,26 @@ class ProduceRequestTest extends BaseRequestTest {
@Test
def testSimpleProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
- val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes,
- System.currentTimeMillis(), 1: Byte)).buffer
- val topicPartition = new TopicPartition("topic", partition)
- val partitionRecords = Map(topicPartition -> messageBuffer)
- val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
- assertEquals(1, produceResponse.responses.size)
- val (tp, partitionResponse) = produceResponse.responses.asScala.head
- assertEquals(topicPartition, tp)
- assertEquals(Errors.NONE.code, partitionResponse.errorCode)
- assertEquals(0, partitionResponse.baseOffset)
- assertEquals(-1, partitionResponse.timestamp)
+
+ def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = Map(topicPartition -> recordBuffer)
+ val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+ assertEquals(1, produceResponse.responses.size)
+ val (tp, partitionResponse) = produceResponse.responses.asScala.head
+ assertEquals(topicPartition, tp)
+ assertEquals(Errors.NONE.code, partitionResponse.errorCode)
+ assertEquals(expectedOffset, partitionResponse.baseOffset)
+ assertEquals(-1, partitionResponse.timestamp)
+ partitionResponse
+ }
+
+ sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
+ new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
+
+ sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
+ new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
+ new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
/* returns a pair of partition id and leader id */
@@ -60,12 +72,13 @@ class ProduceRequestTest extends BaseRequestTest {
@Test
def testCorruptLz4ProduceRequest() {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
- val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes,
- System.currentTimeMillis(), 1: Byte)).buffer
+ val timestamp = 1000000
+ val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
+ new Record(timestamp, "key".getBytes, "value".getBytes))
// Change the lz4 checksum value so that it doesn't match the contents
- messageBuffer.array.update(40, 0)
+ recordBuffer.array.update(40, 0)
val topicPartition = new TopicPartition("topic", partition)
- val partitionRecords = Map(topicPartition -> messageBuffer)
+ val partitionRecords = Map(topicPartition -> recordBuffer)
val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head