You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/11 19:53:30 UTC

[GitHub] [kafka] scott-hendricks opened a new pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

scott-hendricks opened a new pull request #9736:
URL: https://github.com/apache/kafka/pull/9736


   This PR pulls together a couple of the outstanding one-pagers we have for Q4:
   
   https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1605992772
   https://confluentinc.atlassian.net/wiki/spaces/QERM/pages/1589186862
   
   This allows us to run highly granular and configurable workloads to directly simulate customer scenarios and measure the end to end latency all within Trogdor.
   
   This creates a new `ConfigurableProducer` workload that can be used to tune many parts of the workload better than the `ProduceBench` workload.
   
   This also creates all the helper classes for the `ConfigurableProducer` workload.
   
   This adds a new parameter to the `ConsumeBench` workload to allow for processing of records after polling them.
   
   This also adds a new E2E latency test utilizing the new record processor within the `ConsumeBench` workload.
   
   ---
   # ConfigurableProducer workload
   
   The ConfigurableProducer workload allows for customized and even variable configurations in terms of messages per second, message size, batch size, key size, and even the ability to target a specific partition out of a topic.
   
   The parameters that differ from ProduceBenchSpec:
   
   * `flushGenerator` - Used to instruct the KafkaProducer when to issue flushes.  This allows us to simulate variable batching since batch flushing is not currently exposed within the KafkaProducer class.
   * `throughputGenerator` - Used to throttle the ConfigurableProducerWorker based on a calculated number of messages within a window.
   * `activeTopic` - This class only supports execution against a single topic at a time.  If more than one topic is specified, the ConfigurableProducerWorker will throw an error.
   * `activePartition` - Specify a specific partition number within the activeTopic to run load against, or specify `-1` to allow use of all partitions.
   
   Here is an example spec:
   
   ```
   {
       "startMs": 1606949497662,
       "durationMs": 3600000,
       "producerNode": "trogdor-agent-0",
       "bootstrapServers": "some.example.kafka.server:9091",
       "flushGenerator": {
           "type": "gaussian",
           "messagesPerFlushAverage": 16,
           "messagesPerFlushDeviation": 4
       },
       "throughputGenerator": {
           "type": "gaussian",
           "messagesPerSecondAverage": 500,
           "messagesPerSecondDeviation": 50,
           "windowsUntilRateChange": 100,
           "windowSizeMs": 100
       },
       "keyGenerator": {
           "type": "constant",
           "size": 8
       },
       "valueGenerator": {
           "type": "gaussianTimestampRandom",
           "messageSizeAverage": 512,
           "messageSizeDeviation": 100,
           "timestampBytes": 8,
           "messagesUntilSizeChange": 100
       },
       "producerConf": {
           "acks": "all"
       },
       "commonClientConf": {},
       "adminClientConf": {},
       "activeTopic": {
           "topic0": {
               "numPartitions": 100,
               "replicationFactor": 3,
               "configs": {
                   "retention.ms": "1800000"
               }
           }
       },
       "activePartition": 5
   }
   ```
   
   This example spec performed the following:
   
   * Ran on `trogdor-agent-0` for 1 hour starting at 2020-12-02 22:51:37.662 GMT
   * Produced with acks=all to Partition 5 of `topic0` on kafka server `some.example.kafka.server:9091`.
   * The average batch had 16 messages, with a standard deviation of 4 messages.
   * The message had a 8-bit constant key, with an average value of 512 bytes and a standard deviation of 100 bytes.
   * The messages had millisecond timestamps embedded in the first 8-bytes of the value.
   * The average throughput was 500 messages/second, with a window of 100ms and a deviation of 50 messages/second.
   
   ---
   
   # ConsumeBench workload changes
   
   This commit adds the ability for the ConsumeBench workloads to optionally process the records returned through the consumer poll call.  This is done by specifying a new `recordProcessor` parameter.  The record processor's status is then included in the workload's status.
   
   ## RecordProcessor
   
   This interface provides the ability to optionally process records after the ConsumeBench workload polls them.  The interface provides for the ability to include additional data in the status output.
   
   Currently there are 2 processing methods:
   
   * Disabled, by not specifying this parameter.
   * `timestamp` will use `TimestampRecordProcessor` to process records containing a timestamp in the first several bytes of the message.
   
   ### TimestampRecordProcessor
   
   This includes a `TimestampRecordProcessor` class to process records containing a timestamp in the first several bytes of the message.   This class will process records containing timestamps and generate a histogram based on the data.  It will then be present in the status from the `ConsumeBenchWorker` class.  This must be used with a timestamped PayloadGenerator implementation.
   
   Here's an example spec:
   
   ```
   {
      "type": "timestampRandom",
      "timestampBytes": 8,
      "histogramMaxMs": 10000,
      "histogramMinMs": 0,
      "histogramStepMs": 1
   }
   ```
   
   This will track total E2E latency up to 10 seconds, using 1ms resolution and a timestamp size of 8 bytes.
   
   ---
   
   # FlushGenerator
   
   A FlushGenerator is used to facilitate flushing the KafkaProducers on a cadence specified by the user.  This is useful to simulate a specific number of messages in a batch regardless of the message size, since batch flushing is not exposed in the KafkaProducer.
   
   Currently there are 3 flushing methods:
   
   * Disabled, by not specifying this parameter.
   * `constant` will use `ConstantFlushGenerator` to keep the number of messages per batch constant.
   * `gaussian` will use `GaussianFlushGenerator` to vary the number of messages per batch on a normal distribution.
   
   ## ConstantFlushGenerator
   
   This generator will flush the producer after a specific number of messages.  This does not directly control when KafkaProducer will batch, this only makes best effort.  This also cannot tell when a KafkaProducer batch is closed.  If the KafkaProducer sends a batch before this executes, this will continue to execute on its own cadence.
   
   Here is an example spec:
   
   ```
   {
      "type": "constant",
      "messagesPerFlush": 16
   }
   ```
   
   This example will flush the producer every 16 messages.
   
   ## GaussianFlushGenerator
   
   This generator will flush the producer after a specific number of messages, determined by a gaussian distribution.  This does not directly control when KafkaProducer will batch, this only makes best effort.  This also cannot tell when a KafkaProducer batch is closed.  If the KafkaProducer sends a batch before this executes, this will continue to execute on its own cadence.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussian",
      "messagesPerFlushAverage": 16,
      "messagesPerFlushDeviation": 4
   }
   ```
   
   This example will flush the producer on average every 16 messages, assuming `linger.ms` and `batch.size` allow for
   it.  That average changes based on a normal distribution after each flush:
   
   * An average of the flushes will be at 16 messages.
   * ~68% of the flushes are at between 12 and 20 messages.
   * ~95% of the flushes are at between 8 and 24 messages.
   * ~99% of the flushes are at between 4 and 28 messages.
   
   ---
   
   # ThroughputGenerator
   
   Similar to the throttle class, except a simpler design.  This interface is used to facilitate running a configurable number of messages per second by throttling if the throughput goes above a certain amount.
   
   Currently there are 2 throughput methods:
   
   * `constant` will use `ConstantThroughputGenerator` to keep the number of messages per second constant.
   * `gaussian` will use `GaussianThroughputGenerator` to vary the number of messages per second on a normal distribution.
   
   ## ConstantThroughputGenerator
   
   This throughput generator configures constant throughput.  The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in traffic while still being long enough to avoid too much overhead.
   
   Due to binary nature of throughput in terms of messages sent in a window, each window will send at least 1 message, and each window sends the same number of messages, rounded down. For example, 99 messages per second with a 100ms window will only send 90 messages per second, or 9 messages per window. Another example, in order to send only 5 messages per second, a window size of 200ms is required. In cases like these, both the `messagesPerSecond` and `windowSizeMs` parameters should be adjusted together to achieve more accurate throughput.
   
   Here is an example spec:
   
   ```
   {
      "type": "constant",
      "messagesPerSecond": 500,
      "windowSizeMs": 100
   }
   ```
   
   This will produce a workload that runs 500 messages per second, with a maximum resolution of 50 messages per 100 millisecond.
   
   ## GaussianThroughputGenerator
   
   This throughput generator configures throughput with a gaussian normal distribution on a per-window basis. You can specify how many windows to keep the throughput at the rate before changing. All traffic will follow a gaussian distribution centered around `messagesPerSecondAverage` with a deviation of `messagesPerSecondDeviation`.  The lower the window size, the smoother the traffic will be. Using a 100ms window offers no noticeable spikes in traffic while still being long enough to avoid too much overhead.
   
   Due to binary nature of throughput in terms of messages sent in a window, this does not work well for an average throughput of less than 5 messages per window.  In cases where you want lower throughput, the `windowSizeMs` must be adjusted accordingly.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussian",
      "messagesPerSecondAverage": 500,
      "messagesPerSecondDeviation": 50,
      "windowsUntilRateChange": 100,
      "windowSizeMs": 100
   }
   ```
   
   This will produce a workload that runs on average 500 messages per second, however that speed will change every 10 seconds due to the `windowSizeMs * windowsUntilRateChange` parameters. The throughput will have the following normal distribution:
   
   * An average of the throughput windows of 500 messages per second.
   * ~68% of the throughput windows are between 450 and 550 messages per second.
   * ~95% of the throughput windows are between 400 and 600 messages per second.
   * ~99% of the throughput windows are between 350 and 650 messages per second.
   
   ---
   
   # Additional Payload Generators
   
   This implementation also offers additional payload generators to facilitate the tests these workloads are designed to run.  These are also compatible with the existing ProduceBench workloads.
   
   ## TimestampRandomPayloadGenerator
   
   This generator generates timestamped pseudo-random payloads that can be reproduced from run to run.  The guarantees are the same as those of java.util.Random.  The timestamp used for this class is in milliseconds since epoch, encoded directly to the first several bytes of the payload. This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency of a system.
   
   * `size` - The size in bytes of each message.
   * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 8.
   * `seed` - Used to initialize Random() to remove some non-determinism.
   
   Here is an example spec:
   
   ```
   {
      "type": "timestampRandom",
      "size": 512,
      "timestampBytes": 8
   }
   ```
   
   This will generate a 512-byte random message with the first 8 bytes encoded with the timestamp.
   
   ## GaussianTimestampRandomPayloadGenerator
   
   This class behaves identically to TimestampRandomPayloadGenerator, except the message size follows a gaussian distribution.  This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency of a system.
   
   * `messageSizeAverage` - The average size in bytes of each message.
   * `messageSizeDeviation` - The standard deviation to use when calculating message size.
   * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 8.
   * `messagesUntilSizeChange` - The number of messages to keep at the same size.
   * `seed` - Used to initialize Random() to remove some non-determinism.
   
   Here is an example spec:
   
   ```
   {
      "type": "gaussianTimestampRandom",
      "messageSizeAverage": 512,
      "messageSizeDeviation": 100,
      "timestampBytes": 8,
      "messagesUntilSizeChange": 100
   }
   ```
   
   This will generate messages on a gaussian distribution with an average size each 512-bytes and the first 8 bytes encoded with the timestamp.  The message sizes will have a standard deviation of 100 bytes, and the size will only change every 100 messages.  The distribution of messages will be as follows:
   
   * The average size of the messages are 512 bytes.
   * ~68% of the messages are between 412 and 612 bytes
   * ~95% of the messages are between 312 and 712 bytes
   * ~99% of the messages are between 212 and 812 bytes
   
   ---
   
   # Testing
   
   ## New Functionality
   
   The ConfigurableProducer workload was tested by running various scenarios and verifying the metrics within the Kafka cluster matched the scenario as defined.
   
   ## Existing Functionality
   
   The ConsumeBench workload was tested without specifying the `recordProcessor` parameter to verify it still behaves as it did prior to this patch set.  All other code paths are in a new workload.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#issuecomment-748317827


   > I wanted this timestamp to live outside the Kafka broker/client, as that is what customers will see. We can also validate the numbers within Kafka on this if we want, I just wanted to make sure we captured every bit of latency available. We're running this test on systems that are millisecond synced to get an accurate reading.
   
   OK.
   
   > The Throttle architecture didn't exactly work for the variable throughput generation. In order to add partition-specific testing I had to limit the test to 1 topic. I am by no means an experienced Java engineer and I couldn't find a way of modifying the original, or even subclassing it, without forcing the spec to change.
   
   Hmm.  We could probably do some more work to share code, but in the meantime, this is a nice improvement.
   
   LGTM.  Thanks, @scott-hendricks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#discussion_r541379098



##########
File path: tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampRandomPayloadGenerator, except the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating message size.
+ * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 8.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "gaussianTimestampRandom",
+ *    "messageSizeAverage": 512,
+ *    "messageSizeDeviation": 100,
+ *    "timestampBytes": 8,
+ *    "messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size each 512-bytes and the first 8 bytes
+ * encoded with the timestamp.  The message sizes will have a standard deviation of 100 bytes, and the size will only
+ * change every 100 messages.  The distribution of messages will be as follows:
+ *
+ *    The average size of the messages are 512 bytes.
+ *    ~68% of the messages are between 412 and 612 bytes
+ *    ~95% of the messages are between 312 and 712 bytes
+ *    ~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampRandomPayloadGenerator implements PayloadGenerator {
+    private final int messageSizeAverage;
+    private final int messageSizeDeviation;
+    private final int timestampBytes;
+    private final int messagesUntilSizeChange;
+    private final long seed;
+
+    private final Random random = new Random();
+    private final ByteBuffer buffer;
+
+    private int messageTracker = 0;
+    private int messageSize = 0;
+
+    @JsonCreator
+    public GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int messageSizeAverage,
+                                                   @JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+                                                   @JsonProperty("timestampBytes") int timestampBytes,

Review comment:
       it would be good to have a default for timestampBytes, I think.  Basically if this gets set to 0, set it to 8 instead?
   
   Also, when would we want to change the number of timestamp bytes?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #9736:
URL: https://github.com/apache/kafka/pull/9736


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scott-hendricks commented on a change in pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
scott-hendricks commented on a change in pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#discussion_r544547465



##########
File path: tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampRandomPayloadGenerator, except the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating message size.
+ * `timestampBytes` - The amount of bytes to use for the timestamp.  Usually 8.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "gaussianTimestampRandom",
+ *    "messageSizeAverage": 512,
+ *    "messageSizeDeviation": 100,
+ *    "timestampBytes": 8,
+ *    "messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size each 512-bytes and the first 8 bytes
+ * encoded with the timestamp.  The message sizes will have a standard deviation of 100 bytes, and the size will only
+ * change every 100 messages.  The distribution of messages will be as follows:
+ *
+ *    The average size of the messages are 512 bytes.
+ *    ~68% of the messages are between 412 and 612 bytes
+ *    ~95% of the messages are between 312 and 712 bytes
+ *    ~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampRandomPayloadGenerator implements PayloadGenerator {
+    private final int messageSizeAverage;
+    private final int messageSizeDeviation;
+    private final int timestampBytes;
+    private final int messagesUntilSizeChange;
+    private final long seed;
+
+    private final Random random = new Random();
+    private final ByteBuffer buffer;
+
+    private int messageTracker = 0;
+    private int messageSize = 0;
+
+    @JsonCreator
+    public GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int messageSizeAverage,
+                                                   @JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+                                                   @JsonProperty("timestampBytes") int timestampBytes,

Review comment:
       Removed timestampBytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scott-hendricks commented on pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
scott-hendricks commented on pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#issuecomment-744538298


   > Can you say a bit more about the decision to put the timestamp into the message, rather than using the timestamp which Kafka itself puts on every message?
   
   I want to get true End to End message latency, from when the Producer is told to send the message until when the Consumer receives the message to process it.  I wanted this timestamp to live outside the Kafka broker/client, as that is what customers will see.  We can also validate the numbers within Kafka on this if we want, I just wanted to make sure we captured every bit of latency available.  We're running this test on systems that are millisecond synced to get an accurate reading.
   
   This actually came about when discussing https://stackoverflow.com/questions/20520492/how-to-minimize-the-latency-involved-in-kafka-messaging-framework/20585525#20585525 for a completely unrelated reason.  That does not paint Kafka in a good light, so I took it upon myself to try and get some real world latency numbers.
   
   > Is there a reason to have a timestamp size other than 8 bytes?
   
   I wanted to future proof this code against any potential changes in JVM or other architectures.  I can remove it if you want.
   
   >ConfigurableProducerWorker looks a lot like ProduceBenchWorker. Maybe it's OK to have two classes here, but I'd like to understand why we should. ProduceBenchWorker is also intended to be configurable, so I'm not sure why as a user I'd use one or the other. Can we merge these?
   
   That's because it is based on the `ProduceBenchWorker`, however enough of the class was changing that I wanted to keep backward compatible tests working. The `Throttle` architecture didn't exactly work for the variable throughput generation. In order to add partition-specific testing I had to limit the test to 1 topic. I am by no means an experienced Java engineer and I couldn't find a way of modifying the original, or even subclassing it, without forcing the spec to change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#issuecomment-743467898


   Thanks for this, @scott-hendricks ... it looks really cool.
   
   Can you say a bit more about the decision to put the timestamp into the message, rather than using the timestamp which Kafka itself puts on every message?  I guess one reason to do this would be that you don't have to rely on how the broker is configured to set the timestamp.  It would be good to understand the tradeoffs.
   
   Is there a reason to have a timestamp size other than 8 bytes?  If not, then we don't need to make this configurable.
   
   ConfigurableProducerWorker looks a lot like ProduceBenchWorker.  Maybe it's OK to have two classes here, but I'd like to understand why we should.  ProduceBenchWorker is also intended to be configurable, so I'm not sure why as a user I'd use one or the other.  Can we merge these?
   
   It's nice to see gaussian distributions here where you can set the variance!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org