You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/09 21:57:08 UTC

[kafka] branch trunk updated: MINOR: Add PayloadGenerator to Trogdor (#4640)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1c112c  MINOR: Add PayloadGenerator to Trogdor (#4640)
f1c112c is described below

commit f1c112c63d8223e62cf9e2f31cc42321f1e6dada
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Fri Mar 9 13:57:04 2018 -0800

    MINOR: Add PayloadGenerator to Trogdor (#4640)
    
    It generates the producer payload (key and value) and makes sure that the values are
    populated to target a realistic compression rate (0.3 - 0.4) if compression is used.
    The generated payload is deterministic and can be replayed from a given position.
    For now, all generated values are constant size, and key types can be configured
    to be either null or 8 bytes.
    
    Added messageSize parameter to producer spec, that specifies produced
    key + message size.
---
 .../kafka/trogdor/workload/PayloadGenerator.java   | 146 +++++++++++++++++++++
 .../kafka/trogdor/workload/PayloadKeyType.java     |  39 ++++++
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |   8 ++
 .../kafka/trogdor/workload/ProduceBenchWorker.java |  10 +-
 .../kafka/trogdor/workload/RoundTripWorker.java    |  27 ++--
 .../trogdor/workload/PayloadGeneratorTest.java     | 144 ++++++++++++++++++++
 6 files changed, 356 insertions(+), 18 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
new file mode 100644
index 0000000..9acd5fa
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Describes the payload for the producer record. Currently, it generates constant size values
+ * and either null keys or constant size key (depending on requested key type). The generator
+ * is deterministic -- two generator objects created with the same key type, message size, and
+ * value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence
+ * of key/value pairs.
+ */
+public class PayloadGenerator {
+
+    public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3;
+    public static final int DEFAULT_MESSAGE_SIZE = 512;
+
+    /**
+     * This is the ratio of how much each next value is different from the previous value. This
+     * is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us
+     * about 0.3 - 0.45 compression rate with lz4.
+     */
+    private final double valueDivergenceRatio;
+    private final long baseSeed;
+    private long currentPosition;
+    private byte[] baseRecordValue;
+    private PayloadKeyType recordKeyType;
+    private Random random;
+
+    public PayloadGenerator() {
+        this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
+    }
+
+    /**
+     * Generator will generate null keys and values of size `messageSize`
+     * @param messageSize number of bytes used for key + value
+     */
+    public PayloadGenerator(int messageSize) {
+        this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
+    }
+
+    /**
+     * Generator will generate keys of given type and values of size 'messageSize' - (key size).
+     * If the given key type requires more bytes than messageSize, then the resulting payload
+     * will be keys of size required for the given key type and 0-length values.
+     * @param messageSize number of bytes used for key + value
+     * @param keyType type of keys generated
+     */
+    public PayloadGenerator(int messageSize, PayloadKeyType keyType) {
+        this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO);
+    }
+
+    /**
+     * Generator will generate keys of given type and values of size 'messageSize' - (key size).
+     * If the given key type requires more bytes than messageSize, then the resulting payload
+     * will be keys of size required for the given key type and 0-length values.
+     * @param messageSize key + value size
+     * @param valueDivergenceRatio ratio of how much each next value is different from the previous
+     *                             value. Used to approximately control target compression rate (if
+     *                             compression is used).
+     */
+    public PayloadGenerator(int messageSize, PayloadKeyType keyType,
+                            double valueDivergenceRatio) {
+        this.baseSeed = 856;  // some random number, may later let pass seed to constructor
+        this.currentPosition = 0;
+        this.valueDivergenceRatio = valueDivergenceRatio;
+        this.random = new Random(this.baseSeed);
+
+        final int valueSize = (messageSize > keyType.maxSizeInBytes())
+                              ? messageSize - keyType.maxSizeInBytes() : 0;
+        this.baseRecordValue = new byte[valueSize];
+        // initialize value with random bytes
+        for (int i = 0; i < baseRecordValue.length; ++i) {
+            baseRecordValue[i] = (byte) (random.nextInt(26) + 65);
+        }
+        this.recordKeyType = keyType;
+    }
+
+    /**
+     * Returns current position of the payload generator.
+     */
+    public long position() {
+        return currentPosition;
+    }
+
+    /**
+     * Creates record based on the current position, and increments current position.
+     */
+    public ProducerRecord<byte[], byte[]> nextRecord(String topicName) {
+        return nextRecord(topicName, currentPosition++);
+    }
+
+    /**
+     * Creates record based on the given position. Does not change the current position.
+     */
+    public ProducerRecord<byte[], byte[]> nextRecord(String topicName, long position) {
+        byte[] keyBytes = null;
+        if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) {
+            keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array();
+        } else if (recordKeyType != PayloadKeyType.KEY_NULL) {
+            throw new UnsupportedOperationException(
+                "PayloadGenerator does not know how to generate key for key type " + recordKeyType);
+        }
+        return new ProducerRecord<>(topicName, keyBytes, nextValue(position));
+    }
+
+    @Override
+    public String toString() {
+        return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes()
+               + ", recordValueSize=" + baseRecordValue.length
+               + ", valueDivergenceRatio=" + valueDivergenceRatio + ")";
+    }
+
+    /**
+     * Returns producer record value
+     */
+    private byte[] nextValue(long position) {
+        // set the seed based on the given position to make sure that the same value is generated
+        // for the same position.
+        random.setSeed(baseSeed + 31 * position + 1);
+        // randomize some of the payload to achieve expected compression rate
+        byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length);
+        for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i)
+            recordValue[i] = (byte) (random.nextInt(26) + 65);
+        return recordValue;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java
new file mode 100644
index 0000000..3ed98cd
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+/**
+ * Describes a key in producer payload
+ */
+public enum PayloadKeyType {
+    // null key
+    KEY_NULL(0),
+    // fixed size key containing a long integer representing a message index (i.e., position of
+    // the payload generator)
+    KEY_MESSAGE_INDEX(8);
+
+    private final int maxSizeInBytes;
+
+    PayloadKeyType(int maxSizeInBytes) {
+        this.maxSizeInBytes = maxSizeInBytes;
+    }
+
+    public int maxSizeInBytes() {
+        return maxSizeInBytes;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 9f25842..efb2d85 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -36,6 +36,7 @@ public class ProduceBenchSpec extends TaskSpec {
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
     private final int maxMessages;
+    private final int messageSize;
     private final Map<String, String> producerConf;
     private final int totalTopics;
     private final int activeTopics;
@@ -47,6 +48,7 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("bootstrapServers") String bootstrapServers,
                          @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
                          @JsonProperty("maxMessages") int maxMessages,
+                         @JsonProperty("messageSize") int messageSize,
                          @JsonProperty("producerConf") Map<String, String> producerConf,
                          @JsonProperty("totalTopics") int totalTopics,
                          @JsonProperty("activeTopics") int activeTopics) {
@@ -55,6 +57,7 @@ public class ProduceBenchSpec extends TaskSpec {
         this.bootstrapServers = bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
         this.maxMessages = maxMessages;
+        this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
         this.producerConf = producerConf;
         this.totalTopics = totalTopics;
         this.activeTopics = activeTopics;
@@ -81,6 +84,11 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public int messageSize() {
+        return messageSize;
+    }
+
+    @JsonProperty
     public Map<String, String> producerConf() {
         return producerConf;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 27e49cd..1bd386d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -56,8 +56,6 @@ public class ProduceBenchWorker implements TaskWorker {
 
     private static final short REPLICATION_FACTOR = 3;
 
-    private static final int MESSAGE_SIZE = 512;
-
     private static final int THROTTLE_PERIOD_MS = 100;
 
     private final String id;
@@ -174,6 +172,8 @@ public class ProduceBenchWorker implements TaskWorker {
 
         private final KafkaProducer<byte[], byte[]> producer;
 
+        private final PayloadGenerator payloadGenerator;
+
         private final Throttle throttle;
 
         SendRecords() {
@@ -187,6 +187,7 @@ public class ProduceBenchWorker implements TaskWorker {
                 props.setProperty(entry.getKey(), entry.getValue());
             }
             this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
+            this.payloadGenerator = new PayloadGenerator(spec.messageSize());
             this.throttle = new SendRecordsThrottle(perPeriod, producer);
         }
 
@@ -194,13 +195,11 @@ public class ProduceBenchWorker implements TaskWorker {
         public Void call() throws Exception {
             long startTimeMs = Time.SYSTEM.milliseconds();
             try {
-                byte[] key = new byte[MESSAGE_SIZE];
-                byte[] value = new byte[MESSAGE_SIZE];
                 Future<RecordMetadata> future = null;
                 try {
                     for (int m = 0; m < spec.maxMessages(); m++) {
                         for (int i = 0; i < spec.activeTopics(); i++) {
-                            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicIndexToName(i), key, value);
+                            ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(topicIndexToName(i));
                             future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
                         }
                         throttle.increment();
@@ -217,6 +216,7 @@ public class ProduceBenchWorker implements TaskWorker {
                 statusUpdaterFuture.cancel(false);
                 new StatusUpdater(histogram).run();
                 long curTimeMs = Time.SYSTEM.milliseconds();
+                log.info("Produced {}", payloadGenerator);
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
                     histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
             }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 9031c45..5dfac1f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -33,8 +33,6 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
@@ -44,6 +42,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -60,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
 
-    private static final int VALUE_SIZE = 512;
+    private static final int MESSAGE_SIZE = 512;
 
     private static final int LOG_INTERVAL_MS = 5000;
 
@@ -82,9 +81,11 @@ public class RoundTripWorker implements TaskWorker {
 
     private KafkaFutureImpl<String> doneFuture;
 
-    private KafkaProducer<String, byte[]> producer;
+    private KafkaProducer<byte[], byte[]> producer;
 
-    private KafkaConsumer<String, byte[]> consumer;
+    private PayloadGenerator payloadGenerator;
+
+    private KafkaConsumer<byte[], byte[]> consumer;
 
     private CountDownLatch unackedSends;
 
@@ -177,16 +178,16 @@ public class RoundTripWorker implements TaskWorker {
             props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
             props.put(ProducerConfig.ACKS_CONFIG, "all");
             props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
-            producer = new KafkaProducer<>(props, new StringSerializer(),
+            producer = new KafkaProducer<>(props, new ByteArraySerializer(),
                 new ByteArraySerializer());
             int perPeriod = WorkerUtils.
                 perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+            payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX);
         }
 
         @Override
         public void run() {
-            byte[] value = new byte[VALUE_SIZE];
             final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages());
             long messagesSent = 0;
             long uniqueMessagesSent = 0;
@@ -204,8 +205,8 @@ public class RoundTripWorker implements TaskWorker {
                         uniqueMessagesSent++;
                     }
                     messagesSent++;
-                    ProducerRecord<String, byte[]> record =
-                        new ProducerRecord<>(TOPIC_NAME, 0, String.valueOf(messageIndex), value);
+                    // we explicitly specify generator position based on message index
+                    ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex);
                     producer.send(record, new Callback() {
                         @Override
                         public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -267,7 +268,7 @@ public class RoundTripWorker implements TaskWorker {
             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
             props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
-            consumer = new KafkaConsumer<>(props, new StringDeserializer(),
+            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
             consumer.subscribe(Collections.singleton(TOPIC_NAME));
         }
@@ -283,9 +284,9 @@ public class RoundTripWorker implements TaskWorker {
                 while (true) {
                     try {
                         pollInvoked++;
-                        ConsumerRecords<String, byte[]> records = consumer.poll(50);
-                        for (ConsumerRecord<String, byte[]> record : records.records(TOPIC_NAME)) {
-                            int messageIndex = Integer.parseInt(record.key());
+                        ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+                        for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) {
+                            int messageIndex = ByteBuffer.wrap(record.key()).getInt();
                             messagesReceived++;
                             if (toReceiveTracker.removePending(messageIndex)) {
                                 uniqueMessagesReceived++;
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
new file mode 100644
index 0000000..d2954a5
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class PayloadGeneratorTest {
+
+    @Test
+    public void testGeneratorStartsAtPositionZero() {
+        PayloadGenerator payloadGenerator = new PayloadGenerator();
+        assertEquals(0, payloadGenerator.position());
+    }
+
+    @Test
+    public void testDefaultPayload() {
+        final long numRecords = 262;
+        PayloadGenerator payloadGenerator = new PayloadGenerator();
+
+        // make sure that each time we produce a different value (except if compression rate is 0)
+        byte[] prevValue = null;
+        long expectedPosition = 0;
+        for (int i = 0; i < numRecords; i++) {
+            ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
+            assertNull(record.key());
+            assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length);
+            assertEquals(++expectedPosition, payloadGenerator.position());
+            assertFalse("Position " + payloadGenerator.position(),
+                        Arrays.equals(prevValue, record.value()));
+            prevValue = record.value().clone();
+        }
+    }
+
+    @Test
+    public void testNullKeyTypeValueSizeIsMessageSize() {
+        final int size = 200;
+        PayloadGenerator payloadGenerator = new PayloadGenerator(size);
+        ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
+        assertNull(record.key());
+        assertEquals(size, record.value().length);
+    }
+
+    @Test
+    public void testKeyContainsGeneratorPosition() {
+        final long numRecords = 10;
+        final int size = 200;
+        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+        for (int i = 0; i < numRecords; i++) {
+            assertEquals(i, generator.position());
+            ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic");
+            assertEquals(8, record.key().length);
+            assertEquals(size - 8, record.value().length);
+            assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong());
+        }
+    }
+
+    @Test
+    public void testGeneratePayloadWithExplicitPosition() {
+        final int size = 200;
+        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+        int position = 2;
+        while (position < 5000000) {
+            ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic", position);
+            assertEquals(8, record.key().length);
+            assertEquals(size - 8, record.value().length);
+            assertEquals(position, ByteBuffer.wrap(record.key()).getLong());
+            position = position * 64;
+        }
+    }
+
+    public void testSamePositionGeneratesSameKeyAndValue() {
+        final int size = 100;
+        PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+        ProducerRecord<byte[], byte[]> record1 = generator.nextRecord("test-topic");
+        assertEquals(1, generator.position());
+        ProducerRecord<byte[], byte[]> record2 = generator.nextRecord("test-topic");
+        assertEquals(2, generator.position());
+        ProducerRecord<byte[], byte[]> record3 = generator.nextRecord("test-topic", 0);
+        // position should not change if we generated record with specific position
+        assertEquals(2, generator.position());
+        assertFalse("Values at different positions should not match.",
+                    Arrays.equals(record1.value(), record2.value()));
+        assertFalse("Values at different positions should not match.",
+                    Arrays.equals(record3.value(), record2.value()));
+        assertTrue("Values at the same position should match.",
+                   Arrays.equals(record1.value(), record3.value()));
+    }
+
+    @Test
+    public void testGeneratesDeterministicKeyValues() {
+        final long numRecords = 194;
+        final int size = 100;
+        PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+        PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+        for (int i = 0; i < numRecords; ++i) {
+            ProducerRecord<byte[], byte[]> record1 = generator1.nextRecord("test-topic");
+            ProducerRecord<byte[], byte[]> record2 = generator2.nextRecord("test-topic");
+            assertTrue(Arrays.equals(record1.value(), record2.value()));
+            assertTrue(Arrays.equals(record1.key(), record2.key()));
+        }
+    }
+
+    @Test
+    public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() {
+        PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
+        ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic", 877);
+        assertEquals(8, record.key().length);
+        assertEquals(0, record.value().length);
+    }
+
+    @Test
+    public void testNextRecordGeneratesNewByteArrayForValue() {
+        PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
+        ProducerRecord<byte[], byte[]> record1 = payloadGenerator.nextRecord("test-topic", 877);
+        ProducerRecord<byte[], byte[]> record2 = payloadGenerator.nextRecord("test-topic", 877);
+        assertNotEquals(record1.value(), record2.value());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.