You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "showuon (via GitHub)" <gi...@apache.org> on 2023/05/03 07:10:50 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183301270


##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test");
+                } else {
+                    syncSend(producer, key, "test");

Review Comment:
   nit: Could we let the value different for each key? Maybe `"test" + key` ?



##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test");
+                } else {
+                    syncSend(producer, key, "test");
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of requests beyond just ip/port
+        // by allowing a logical application name to be included in server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
+        }

Review Comment:
   Thanks for adding comments for each configs changed



##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test");
+                } else {
+                    syncSend(producer, key, "test");
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of requests beyond just ip/port
+        // by allowing a logical application name to be included in server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
+        }
+        if (transactionalId != null) {
+            // the transactional id must be static and unique
+            // it is used to identify the same producer instance across process restarts
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        // enable duplicates protection at the partition level
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+        return new KafkaProducer<>(props);
     }
 
-    private Future<RecordMetadata> send(final int messageKey, final String messageStr) {
-        return producer.send(new ProducerRecord<>(topic,
-                messageKey,
-                messageStr));
+    private void asyncSend(KafkaProducer<Integer, String> producer, int key, String value) {
+        // send the record asynchronously, setting a callback to be notified of the result
+        // note tha send blocks when buffer.memory is full or metadata are not available
+        producer.send(new ProducerRecord<>(topic, key, value), new ProducerCallback(key, value));
     }
-}
 
-class DemoCallBack implements Callback {
+    private RecordMetadata syncSend(KafkaProducer<Integer, String> producer, int key, String value)
+        throws ExecutionException, InterruptedException {
+        try {
+            // send the record and then call get, which blocks waiting for the ack from the broker
+            RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
+            Utils.maybePrintRecord(numRecords, key, value, metadata);
+            return metadata;
+        } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                 | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+            Utils.printErr(e.getMessage());
+            // we can't recover from these exceptions
+            shutdown();
+        } catch (KafkaException e) {
+            Utils.printErr(e.getMessage());
+        }
+        return null;
+    }
 
-    private final long startTime;
-    private final int key;
-    private final String message;
+    class ProducerCallback implements Callback {
+        private final int key;
+        private final String value;
 
-    public DemoCallBack(long startTime, int key, String message) {
-        this.startTime = startTime;
-        this.key = key;
-        this.message = message;
-    }
+        public ProducerCallback(int key, String value) {
+            this.key = key;
+            this.value = value;
+        }
 
-    /**
-     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
-     * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
-     *
-     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
-     *                  with -1 value for all fields except for topicPartition will be returned if an error occurred.
-     * @param exception The exception thrown during processing of this record. Null if no error occurred.
-     */

Review Comment:
   These javadocs are helpful. Could we add them back? 



##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test");
+                } else {
+                    syncSend(producer, key, "test");
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of requests beyond just ip/port
+        // by allowing a logical application name to be included in server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
+        }
+        if (transactionalId != null) {
+            // the transactional id must be static and unique
+            // it is used to identify the same producer instance across process restarts
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        // enable duplicates protection at the partition level
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+        return new KafkaProducer<>(props);
     }
 
-    private Future<RecordMetadata> send(final int messageKey, final String messageStr) {
-        return producer.send(new ProducerRecord<>(topic,
-                messageKey,
-                messageStr));
+    private void asyncSend(KafkaProducer<Integer, String> producer, int key, String value) {
+        // send the record asynchronously, setting a callback to be notified of the result
+        // note tha send blocks when buffer.memory is full or metadata are not available
+        producer.send(new ProducerRecord<>(topic, key, value), new ProducerCallback(key, value));
     }
-}
 
-class DemoCallBack implements Callback {
+    private RecordMetadata syncSend(KafkaProducer<Integer, String> producer, int key, String value)
+        throws ExecutionException, InterruptedException {
+        try {
+            // send the record and then call get, which blocks waiting for the ack from the broker
+            RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
+            Utils.maybePrintRecord(numRecords, key, value, metadata);
+            return metadata;
+        } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
+                 | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
+            Utils.printErr(e.getMessage());
+            // we can't recover from these exceptions
+            shutdown();
+        } catch (KafkaException e) {
+            Utils.printErr(e.getMessage());
+        }
+        return null;
+    }
 
-    private final long startTime;
-    private final int key;
-    private final String message;
+    class ProducerCallback implements Callback {
+        private final int key;
+        private final String value;
 
-    public DemoCallBack(long startTime, int key, String message) {
-        this.startTime = startTime;
-        this.key = key;
-        this.message = message;
-    }
+        public ProducerCallback(int key, String value) {
+            this.key = key;
+            this.value = value;
+        }
 
-    /**
-     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
-     * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
-     *
-     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
-     *                  with -1 value for all fields except for topicPartition will be returned if an error occurred.
-     * @param exception The exception thrown during processing of this record. Null if no error occurred.
-     */
-    public void onCompletion(RecordMetadata metadata, Exception exception) {
-        long elapsedTime = System.currentTimeMillis() - startTime;
-        if (metadata != null) {
-            System.out.println(
-                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
-                    "), " +
-                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
-        } else {
-            exception.printStackTrace();
+        public void onCompletion(RecordMetadata metadata, Exception e) {
+            if (e != null) {
+                Utils.printErr(e.getMessage());
+                if (e instanceof AuthorizationException
+                    || e instanceof UnsupportedVersionException
+                    || e instanceof ProducerFencedException
+                    || e instanceof FencedInstanceIdException
+                    || e instanceof OutOfOrderSequenceException
+                    || e instanceof SerializationException) {

Review Comment:
   I'm curious, where did you get all list exceptions that we can't recover? Are they all of them? How about `UnsupportedForMessageFormatException`? And there are also some listed [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L36). Should we also add them? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org