You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/09 21:57:08 UTC
[kafka] branch trunk updated: MINOR: Add PayloadGenerator to
Trogdor (#4640)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1c112c MINOR: Add PayloadGenerator to Trogdor (#4640)
f1c112c is described below
commit f1c112c63d8223e62cf9e2f31cc42321f1e6dada
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Fri Mar 9 13:57:04 2018 -0800
MINOR: Add PayloadGenerator to Trogdor (#4640)
It generates the producer payload (key and value) and makes sure that the values are
populated to target a realistic compression rate (0.3 - 0.4) if compression is used.
The generated payload is deterministic and can be replayed from a given position.
For now, all generated values are constant size, and key types can be configured
to be either null or 8 bytes.
Added messageSize parameter to producer spec, that specifies produced
key + message size.
---
.../kafka/trogdor/workload/PayloadGenerator.java | 146 +++++++++++++++++++++
.../kafka/trogdor/workload/PayloadKeyType.java | 39 ++++++
.../kafka/trogdor/workload/ProduceBenchSpec.java | 8 ++
.../kafka/trogdor/workload/ProduceBenchWorker.java | 10 +-
.../kafka/trogdor/workload/RoundTripWorker.java | 27 ++--
.../trogdor/workload/PayloadGeneratorTest.java | 144 ++++++++++++++++++++
6 files changed, 356 insertions(+), 18 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
new file mode 100644
index 0000000..9acd5fa
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Describes the payload for the producer record. Currently, it generates constant size values
+ * and either null keys or constant size key (depending on requested key type). The generator
+ * is deterministic -- two generator objects created with the same key type, message size, and
+ * value divergence ratio (see `valueDivergenceRatio` description) will generate the same sequence
+ * of key/value pairs.
+ */
+public class PayloadGenerator {
+
+ public static final double DEFAULT_VALUE_DIVERGENCE_RATIO = 0.3;
+ public static final int DEFAULT_MESSAGE_SIZE = 512;
+
+ /**
+ * This is the ratio of how much each next value is different from the previous value. This
+ * is directly related to compression rate we will get. Example: 0.3 divergence ratio gets us
+ * about 0.3 - 0.45 compression rate with lz4.
+ */
+ private final double valueDivergenceRatio;
+ private final long baseSeed;
+ private long currentPosition;
+ private byte[] baseRecordValue;
+ private PayloadKeyType recordKeyType;
+ private Random random;
+
+ public PayloadGenerator() {
+ this(DEFAULT_MESSAGE_SIZE, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
+ }
+
+ /**
+ * Generator will generate null keys and values of size `messageSize`
+ * @param messageSize number of bytes used for key + value
+ */
+ public PayloadGenerator(int messageSize) {
+ this(messageSize, PayloadKeyType.KEY_NULL, DEFAULT_VALUE_DIVERGENCE_RATIO);
+ }
+
+ /**
+ * Generator will generate keys of given type and values of size 'messageSize' - (key size).
+ * If the given key type requires more bytes than messageSize, then the resulting payload
+ * will be keys of size required for the given key type and 0-length values.
+ * @param messageSize number of bytes used for key + value
+ * @param keyType type of keys generated
+ */
+ public PayloadGenerator(int messageSize, PayloadKeyType keyType) {
+ this(messageSize, keyType, DEFAULT_VALUE_DIVERGENCE_RATIO);
+ }
+
+ /**
+ * Generator will generate keys of given type and values of size 'messageSize' - (key size).
+ * If the given key type requires more bytes than messageSize, then the resulting payload
+ * will be keys of size required for the given key type and 0-length values.
+ * @param messageSize key + value size
+ * @param valueDivergenceRatio ratio of how much each next value is different from the previous
+ * value. Used to approximately control target compression rate (if
+ * compression is used).
+ */
+ public PayloadGenerator(int messageSize, PayloadKeyType keyType,
+ double valueDivergenceRatio) {
+ this.baseSeed = 856; // some random number, may later let pass seed to constructor
+ this.currentPosition = 0;
+ this.valueDivergenceRatio = valueDivergenceRatio;
+ this.random = new Random(this.baseSeed);
+
+ final int valueSize = (messageSize > keyType.maxSizeInBytes())
+ ? messageSize - keyType.maxSizeInBytes() : 0;
+ this.baseRecordValue = new byte[valueSize];
+ // initialize value with random bytes
+ for (int i = 0; i < baseRecordValue.length; ++i) {
+ baseRecordValue[i] = (byte) (random.nextInt(26) + 65);
+ }
+ this.recordKeyType = keyType;
+ }
+
+ /**
+ * Returns current position of the payload generator.
+ */
+ public long position() {
+ return currentPosition;
+ }
+
+ /**
+ * Creates record based on the current position, and increments current position.
+ */
+ public ProducerRecord<byte[], byte[]> nextRecord(String topicName) {
+ return nextRecord(topicName, currentPosition++);
+ }
+
+ /**
+ * Creates record based on the given position. Does not change the current position.
+ */
+ public ProducerRecord<byte[], byte[]> nextRecord(String topicName, long position) {
+ byte[] keyBytes = null;
+ if (recordKeyType == PayloadKeyType.KEY_MESSAGE_INDEX) {
+ keyBytes = ByteBuffer.allocate(recordKeyType.maxSizeInBytes()).putLong(position).array();
+ } else if (recordKeyType != PayloadKeyType.KEY_NULL) {
+ throw new UnsupportedOperationException(
+ "PayloadGenerator does not know how to generate key for key type " + recordKeyType);
+ }
+ return new ProducerRecord<>(topicName, keyBytes, nextValue(position));
+ }
+
+ @Override
+ public String toString() {
+ return "PayloadGenerator(recordKeySize=" + recordKeyType.maxSizeInBytes()
+ + ", recordValueSize=" + baseRecordValue.length
+ + ", valueDivergenceRatio=" + valueDivergenceRatio + ")";
+ }
+
+ /**
+ * Returns producer record value
+ */
+ private byte[] nextValue(long position) {
+ // set the seed based on the given position to make sure that the same value is generated
+ // for the same position.
+ random.setSeed(baseSeed + 31 * position + 1);
+ // randomize some of the payload to achieve expected compression rate
+ byte[] recordValue = Arrays.copyOf(baseRecordValue, baseRecordValue.length);
+ for (int i = 0; i < recordValue.length * valueDivergenceRatio; ++i)
+ recordValue[i] = (byte) (random.nextInt(26) + 65);
+ return recordValue;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java
new file mode 100644
index 0000000..3ed98cd
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadKeyType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+/**
+ * Describes a key in producer payload
+ */
+public enum PayloadKeyType {
+ // null key
+ KEY_NULL(0),
+ // fixed size key containing a long integer representing a message index (i.e., position of
+ // the payload generator)
+ KEY_MESSAGE_INDEX(8);
+
+ private final int maxSizeInBytes;
+
+ PayloadKeyType(int maxSizeInBytes) {
+ this.maxSizeInBytes = maxSizeInBytes;
+ }
+
+ public int maxSizeInBytes() {
+ return maxSizeInBytes;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 9f25842..efb2d85 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -36,6 +36,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final String bootstrapServers;
private final int targetMessagesPerSec;
private final int maxMessages;
+ private final int messageSize;
private final Map<String, String> producerConf;
private final int totalTopics;
private final int activeTopics;
@@ -47,6 +48,7 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("maxMessages") int maxMessages,
+ @JsonProperty("messageSize") int messageSize,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("totalTopics") int totalTopics,
@JsonProperty("activeTopics") int activeTopics) {
@@ -55,6 +57,7 @@ public class ProduceBenchSpec extends TaskSpec {
this.bootstrapServers = bootstrapServers;
this.targetMessagesPerSec = targetMessagesPerSec;
this.maxMessages = maxMessages;
+ this.messageSize = (messageSize == 0) ? PayloadGenerator.DEFAULT_MESSAGE_SIZE : messageSize;
this.producerConf = producerConf;
this.totalTopics = totalTopics;
this.activeTopics = activeTopics;
@@ -81,6 +84,11 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
+ public int messageSize() {
+ return messageSize;
+ }
+
+ @JsonProperty
public Map<String, String> producerConf() {
return producerConf;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 27e49cd..1bd386d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -56,8 +56,6 @@ public class ProduceBenchWorker implements TaskWorker {
private static final short REPLICATION_FACTOR = 3;
- private static final int MESSAGE_SIZE = 512;
-
private static final int THROTTLE_PERIOD_MS = 100;
private final String id;
@@ -174,6 +172,8 @@ public class ProduceBenchWorker implements TaskWorker {
private final KafkaProducer<byte[], byte[]> producer;
+ private final PayloadGenerator payloadGenerator;
+
private final Throttle throttle;
SendRecords() {
@@ -187,6 +187,7 @@ public class ProduceBenchWorker implements TaskWorker {
props.setProperty(entry.getKey(), entry.getValue());
}
this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
+ this.payloadGenerator = new PayloadGenerator(spec.messageSize());
this.throttle = new SendRecordsThrottle(perPeriod, producer);
}
@@ -194,13 +195,11 @@ public class ProduceBenchWorker implements TaskWorker {
public Void call() throws Exception {
long startTimeMs = Time.SYSTEM.milliseconds();
try {
- byte[] key = new byte[MESSAGE_SIZE];
- byte[] value = new byte[MESSAGE_SIZE];
Future<RecordMetadata> future = null;
try {
for (int m = 0; m < spec.maxMessages(); m++) {
for (int i = 0; i < spec.activeTopics(); i++) {
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicIndexToName(i), key, value);
+ ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(topicIndexToName(i));
future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
}
throttle.increment();
@@ -217,6 +216,7 @@ public class ProduceBenchWorker implements TaskWorker {
statusUpdaterFuture.cancel(false);
new StatusUpdater(histogram).run();
long curTimeMs = Time.SYSTEM.milliseconds();
+ log.info("Produced {}", payloadGenerator);
log.info("Sent {} total record(s) in {} ms. status: {}",
histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 9031c45..5dfac1f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -33,8 +33,6 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
@@ -44,6 +42,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -60,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
- private static final int VALUE_SIZE = 512;
+ private static final int MESSAGE_SIZE = 512;
private static final int LOG_INTERVAL_MS = 5000;
@@ -82,9 +81,11 @@ public class RoundTripWorker implements TaskWorker {
private KafkaFutureImpl<String> doneFuture;
- private KafkaProducer<String, byte[]> producer;
+ private KafkaProducer<byte[], byte[]> producer;
- private KafkaConsumer<String, byte[]> consumer;
+ private PayloadGenerator payloadGenerator;
+
+ private KafkaConsumer<byte[], byte[]> consumer;
private CountDownLatch unackedSends;
@@ -177,16 +178,16 @@ public class RoundTripWorker implements TaskWorker {
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
- producer = new KafkaProducer<>(props, new StringSerializer(),
+ producer = new KafkaProducer<>(props, new ByteArraySerializer(),
new ByteArraySerializer());
int perPeriod = WorkerUtils.
perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+ payloadGenerator = new PayloadGenerator(MESSAGE_SIZE, PayloadKeyType.KEY_MESSAGE_INDEX);
}
@Override
public void run() {
- byte[] value = new byte[VALUE_SIZE];
final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages());
long messagesSent = 0;
long uniqueMessagesSent = 0;
@@ -204,8 +205,8 @@ public class RoundTripWorker implements TaskWorker {
uniqueMessagesSent++;
}
messagesSent++;
- ProducerRecord<String, byte[]> record =
- new ProducerRecord<>(TOPIC_NAME, 0, String.valueOf(messageIndex), value);
+ // we explicitly specify generator position based on message index
+ ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord(TOPIC_NAME, messageIndex);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -267,7 +268,7 @@ public class RoundTripWorker implements TaskWorker {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
- consumer = new KafkaConsumer<>(props, new StringDeserializer(),
+ consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.subscribe(Collections.singleton(TOPIC_NAME));
}
@@ -283,9 +284,9 @@ public class RoundTripWorker implements TaskWorker {
while (true) {
try {
pollInvoked++;
- ConsumerRecords<String, byte[]> records = consumer.poll(50);
- for (ConsumerRecord<String, byte[]> record : records.records(TOPIC_NAME)) {
- int messageIndex = Integer.parseInt(record.key());
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+ for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) {
+ int messageIndex = ByteBuffer.wrap(record.key()).getInt();
messagesReceived++;
if (toReceiveTracker.removePending(messageIndex)) {
uniqueMessagesReceived++;
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
new file mode 100644
index 0000000..d2954a5
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class PayloadGeneratorTest {
+
+ @Test
+ public void testGeneratorStartsAtPositionZero() {
+ PayloadGenerator payloadGenerator = new PayloadGenerator();
+ assertEquals(0, payloadGenerator.position());
+ }
+
+ @Test
+ public void testDefaultPayload() {
+ final long numRecords = 262;
+ PayloadGenerator payloadGenerator = new PayloadGenerator();
+
+ // make sure that each time we produce a different value (except if compression rate is 0)
+ byte[] prevValue = null;
+ long expectedPosition = 0;
+ for (int i = 0; i < numRecords; i++) {
+ ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
+ assertNull(record.key());
+ assertEquals(PayloadGenerator.DEFAULT_MESSAGE_SIZE, record.value().length);
+ assertEquals(++expectedPosition, payloadGenerator.position());
+ assertFalse("Position " + payloadGenerator.position(),
+ Arrays.equals(prevValue, record.value()));
+ prevValue = record.value().clone();
+ }
+ }
+
+ @Test
+ public void testNullKeyTypeValueSizeIsMessageSize() {
+ final int size = 200;
+ PayloadGenerator payloadGenerator = new PayloadGenerator(size);
+ ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic");
+ assertNull(record.key());
+ assertEquals(size, record.value().length);
+ }
+
+ @Test
+ public void testKeyContainsGeneratorPosition() {
+ final long numRecords = 10;
+ final int size = 200;
+ PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+ for (int i = 0; i < numRecords; i++) {
+ assertEquals(i, generator.position());
+ ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic");
+ assertEquals(8, record.key().length);
+ assertEquals(size - 8, record.value().length);
+ assertEquals("i=" + i, i, ByteBuffer.wrap(record.key()).getLong());
+ }
+ }
+
+ @Test
+ public void testGeneratePayloadWithExplicitPosition() {
+ final int size = 200;
+ PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+ int position = 2;
+ while (position < 5000000) {
+ ProducerRecord<byte[], byte[]> record = generator.nextRecord("test-topic", position);
+ assertEquals(8, record.key().length);
+ assertEquals(size - 8, record.value().length);
+ assertEquals(position, ByteBuffer.wrap(record.key()).getLong());
+ position = position * 64;
+ }
+ }
+
+ public void testSamePositionGeneratesSameKeyAndValue() {
+ final int size = 100;
+ PayloadGenerator generator = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+ ProducerRecord<byte[], byte[]> record1 = generator.nextRecord("test-topic");
+ assertEquals(1, generator.position());
+ ProducerRecord<byte[], byte[]> record2 = generator.nextRecord("test-topic");
+ assertEquals(2, generator.position());
+ ProducerRecord<byte[], byte[]> record3 = generator.nextRecord("test-topic", 0);
+ // position should not change if we generated record with specific position
+ assertEquals(2, generator.position());
+ assertFalse("Values at different positions should not match.",
+ Arrays.equals(record1.value(), record2.value()));
+ assertFalse("Values at different positions should not match.",
+ Arrays.equals(record3.value(), record2.value()));
+ assertTrue("Values at the same position should match.",
+ Arrays.equals(record1.value(), record3.value()));
+ }
+
+ @Test
+ public void testGeneratesDeterministicKeyValues() {
+ final long numRecords = 194;
+ final int size = 100;
+ PayloadGenerator generator1 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+ PayloadGenerator generator2 = new PayloadGenerator(size, PayloadKeyType.KEY_MESSAGE_INDEX);
+ for (int i = 0; i < numRecords; ++i) {
+ ProducerRecord<byte[], byte[]> record1 = generator1.nextRecord("test-topic");
+ ProducerRecord<byte[], byte[]> record2 = generator2.nextRecord("test-topic");
+ assertTrue(Arrays.equals(record1.value(), record2.value()));
+ assertTrue(Arrays.equals(record1.key(), record2.key()));
+ }
+ }
+
+ @Test
+ public void testTooSmallMessageSizeCreatesPayloadWithOneByteValues() {
+ PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
+ ProducerRecord<byte[], byte[]> record = payloadGenerator.nextRecord("test-topic", 877);
+ assertEquals(8, record.key().length);
+ assertEquals(0, record.value().length);
+ }
+
+ @Test
+ public void testNextRecordGeneratesNewByteArrayForValue() {
+ PayloadGenerator payloadGenerator = new PayloadGenerator(2, PayloadKeyType.KEY_MESSAGE_INDEX);
+ ProducerRecord<byte[], byte[]> record1 = payloadGenerator.nextRecord("test-topic", 877);
+ ProducerRecord<byte[], byte[]> record2 = payloadGenerator.nextRecord("test-topic", 877);
+ assertNotEquals(record1.value(), record2.value());
+ }
+}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.