You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/02/12 20:46:10 UTC

[kafka] branch 2.5 updated (8e6d24a -> a017dcb)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 8e6d24a  KAFKA-9499; Improve deletion process by batching more aggressively (#8053)
     new a4b2a08  KAFKA-9447: Add new customized EOS model example (#8031)
     new c853929  HOTFIX: Fix spotsbug failure in Kafka examples (#8051)
     new a017dcb  KAFKA-9417: New Integration Test for KIP-447 (#8000)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |  23 +--
 .../kafka/clients/admin/ListTopicsResult.java      |  14 +-
 .../consumer/internals/ConsumerCoordinator.java    |   2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  11 +-
 .../producer/internals/TransactionManager.java     |   2 +
 .../kafka/coordinator/group/GroupCoordinator.scala |   4 +-
 .../transaction/TransactionStateManager.scala      |   2 +-
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  10 +-
 examples/README                                    |   8 +-
 ...imple-consumer-demo.sh => exactly-once-demo.sh} |   7 +-
 .../src/main/java/kafka/examples/Consumer.java     |  32 +++-
 .../examples/ExactlyOnceMessageProcessor.java      | 212 +++++++++++++++++++++
 .../kafka/examples/KafkaConsumerProducerDemo.java  |  18 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  | 193 +++++++++++++++++++
 .../src/main/java/kafka/examples/Producer.java     |  46 ++++-
 tests/docker/ducker-ak                             |   2 +
 tests/kafkatest/services/console_consumer.py       |  13 +-
 tests/kafkatest/services/kafka/kafka.py            |   2 +-
 .../services/transactional_message_copier.py       |  16 +-
 tests/kafkatest/services/verifiable_producer.py    |  14 +-
 ...ons_test.py => group_mode_transactions_test.py} | 171 +++++++++++------
 tests/kafkatest/tests/core/transactions_test.py    |  34 ++--
 .../kafka/tools/TransactionalMessageCopier.java    | 153 ++++++++++-----
 .../org/apache/kafka/tools/VerifiableConsumer.java |  10 +-
 25 files changed, 815 insertions(+), 186 deletions(-)
 copy examples/bin/{java-simple-consumer-demo.sh => exactly-once-demo.sh} (91%)
 create mode 100644 examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
 create mode 100644 examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
 copy tests/kafkatest/tests/core/{transactions_test.py => group_mode_transactions_test.py} (60%)


[kafka] 03/03: KAFKA-9417: New Integration Test for KIP-447 (#8000)

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a017dcbd3e7d059a0ca4e76777a42912e6b91e65
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Feb 12 12:34:12 2020 -0800

    KAFKA-9417: New Integration Test for KIP-447 (#8000)
    
    This change mainly have 2 components:
    
    1. extend the existing transactions_test.py to also try out new sendTxnOffsets(groupMetadata) API to make sure we are not introducing any regression or compatibility issue
      a. We shrink the time window to 10 seconds for the txn timeout scheduler on broker so that we could trigger expiration earlier than later
    
    2. create a completely new system test class called group_mode_transactions_test which is more complicated than the existing system test, as we are taking rebalance into consideration and using multiple partitions instead of one. For further breakdown:
      a. The message count was done on partition level, instead of global as we need to visualize
    the per partition order throughout the test. For this sake, we extend ConsoleConsumer to print out the data partition as well to help message copier interpret the per partition data.
      b. The progress count includes the time for completing the pending txn offset expiration
      c. More visibility and feature improvements on TransactionMessageCopier to better work under either standalone or group mode.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  11 +-
 .../producer/internals/TransactionManager.java     |   2 +
 .../kafka/coordinator/group/GroupCoordinator.scala |   4 +-
 .../transaction/TransactionStateManager.scala      |   2 +-
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  10 +-
 .../examples/ExactlyOnceMessageProcessor.java      |   5 +-
 .../kafka/examples/KafkaConsumerProducerDemo.java  |   2 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  |   2 +-
 .../src/main/java/kafka/examples/Producer.java     |   4 +
 tests/docker/ducker-ak                             |   2 +
 tests/kafkatest/services/console_consumer.py       |  13 +-
 tests/kafkatest/services/kafka/kafka.py            |   2 +-
 .../services/transactional_message_copier.py       |  16 +-
 tests/kafkatest/services/verifiable_producer.py    |  14 +-
 ...ons_test.py => group_mode_transactions_test.py} | 171 +++++++++++++--------
 tests/kafkatest/tests/core/transactions_test.py    |  34 ++--
 .../kafka/tools/TransactionalMessageCopier.java    | 153 ++++++++++++------
 .../org/apache/kafka/tools/VerifiableConsumer.java |  10 +-
 20 files changed, 316 insertions(+), 145 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9b05f59..7ebe7fc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -245,7 +245,7 @@
     <suppress checks="BooleanExpressionComplexity"
               files="StreamsResetter.java"/>
     <suppress checks="NPathComplexity"
-              files="(ProducerPerformance|StreamsResetter|Agent).java"/>
+              files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier).java"/>
     <suppress checks="ImportControl"
               files="SignalLogger.java"/>
     <suppress checks="IllegalImport"
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 999921f..7a5f627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1291,7 +1291,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 // just retry
                 log.info("The following partitions still have unstable offsets " +
                              "which are not cleared on the broker side: {}" +
-                             ", this could be either" +
+                             ", this could be either " +
                              "transactional offsets waiting for completion, or " +
                              "normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
                 future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7890c9a..f0aaa13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1018,15 +1018,16 @@ public class Fetcher<K, V> implements Closeable {
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
                        error == Errors.OFFSET_NOT_AVAILABLE ||
-                       error == Errors.LEADER_NOT_AVAILABLE) {
-                log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
-                        topicPartition, error);
-                partitionsToRetry.add(topicPartition);
-            } else if (error == Errors.FENCED_LEADER_EPOCH ||
+                       error == Errors.LEADER_NOT_AVAILABLE ||
                        error == Errors.UNKNOWN_LEADER_EPOCH) {
                 log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
                         topicPartition, error);
                 partitionsToRetry.add(topicPartition);
+            } else if (error == Errors.FENCED_LEADER_EPOCH) {
+                log.debug("Attempt to fetch offsets for partition {} failed due to fenced leader epoch, refresh " +
+                              "the metadata and retrying.", topicPartition);
+                metadata.requestUpdate();
+                partitionsToRetry.add(topicPartition);
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
                 partitionsToRetry.add(topicPartition);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 0872d62..18ab408 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -66,6 +66,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Locale;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -1329,6 +1330,7 @@ public class TransactionManager {
                         transactionCoordinator = node;
                 }
                 result.done();
+                log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
                 reenqueue();
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 90f391a..4843154 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -240,13 +240,13 @@ class GroupCoordinator(val brokerId: Int,
         } else if (requireKnownMemberId) {
             // If member id required (dynamic membership), register the member in the pending member list
             // and send back a response to call for another join group request with allocated member id.
-          debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+          debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
               s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
           group.addPendingMember(newMemberId)
           addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
           responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
         } else {
-          debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
+          debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
             s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
           addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
             clientId, clientHost, protocolType, protocols, group, responseCallback)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index da7502c..174e3a5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -47,7 +47,7 @@ object TransactionStateManager {
   // default transaction management config values
   val DefaultTransactionsMaxTimeoutMs: Int = TimeUnit.MINUTES.toMillis(15).toInt
   val DefaultTransactionalIdExpirationMs: Int = TimeUnit.DAYS.toMillis(7).toInt
-  val DefaultAbortTimedOutTransactionsIntervalMs: Int = TimeUnit.MINUTES.toMillis(1).toInt
+  val DefaultAbortTimedOutTransactionsIntervalMs: Int = TimeUnit.SECONDS.toMillis(10).toInt
   val DefaultRemoveExpiredTransactionalIdsIntervalMs: Int = TimeUnit.HOURS.toMillis(1).toInt
 }
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 691cde5..92c6c9c 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -455,9 +455,10 @@ object ConsoleConsumer extends Logging {
 }
 
 class DefaultMessageFormatter extends MessageFormatter {
+  var printTimestamp = false
   var printKey = false
   var printValue = true
-  var printTimestamp = false
+  var printPartition = false
   var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
   var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
 
@@ -471,6 +472,8 @@ class DefaultMessageFormatter extends MessageFormatter {
       printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
     if (props.containsKey("print.value"))
       printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true")
+    if (props.containsKey("print.partition"))
+      printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true")
     if (props.containsKey("key.separator"))
       keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
     if (props.containsKey("line.separator"))
@@ -531,6 +534,11 @@ class DefaultMessageFormatter extends MessageFormatter {
 
     if (printValue) {
       write(valueDeserializer, value, topic)
+      writeSeparator(printPartition)
+    }
+
+    if (printPartition) {
+      output.write(s"$partition".getBytes(StandardCharsets.UTF_8))
       output.write(lineSeparator)
     }
   }
diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
index 53685f3..482e442 100644
--- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -76,8 +76,11 @@ public class ExactlyOnceMessageProcessor extends Thread {
         this.numInstances = numInstances;
         this.instanceIdx = instanceIdx;
         this.transactionalId = "Processor-" + instanceIdx;
+        // If we are using the group mode, it is recommended to have a relatively short txn timeout
+        // in order to clear pending offsets faster.
+        final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000 : -1;
         // A unique transactional.id must be provided in order to properly use EOS.
-        producer = new Producer(outputTopic, true, transactionalId, true, -1, null).get();
+        producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
         // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
         consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get();
         this.latch = latch;
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 21d85c3..8a29402 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -25,7 +25,7 @@ public class KafkaConsumerProducerDemo {
     public static void main(String[] args) throws InterruptedException {
         boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
         CountDownLatch latch = new CountDownLatch(2);
-        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, latch);
+        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
         producerThread.start();
 
         Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch);
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 288b786..6da159c 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -88,7 +88,7 @@ public class KafkaExactlyOnceDemo {
         CountDownLatch prePopulateLatch = new CountDownLatch(1);
 
         /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch);
+        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch);
         producerThread.start();
 
         if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index 3805dd3..da8c00b 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -40,12 +40,16 @@ public class Producer extends Thread {
                     final String transactionalId,
                     final boolean enableIdempotency,
                     final int numRecords,
+                    final int transactionTimeoutMs,
                     final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        if (transactionTimeoutMs > 0) {
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
+        }
         if (transactionalId != null) {
             props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
         }
diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak
index a29395e..f923de2 100755
--- a/tests/docker/ducker-ak
+++ b/tests/docker/ducker-ak
@@ -223,6 +223,8 @@ ducker_build() {
     SECONDS=0
 
     must_pushd "${ducker_dir}"
+    # Tip: if you are scratching your head for some dependency problems that are referring to an old code version
+    # (for example java.lang.NoClassDefFoundError), add --no-cache flag to the build shall give you a clean start.
     must_do -v -o docker build --memory="${docker_build_memory_limit}" \
         --build-arg "ducker_creator=${user_name}" --build-arg "jdk_version=${jdk_version}" -t "${image_name}" \
         -f "${ducker_dir}/Dockerfile" ${docker_args} -- .
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 5fd4712..0811bcd 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -60,7 +60,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
     def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
-                 enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False,
+                 enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, print_partition=False,
                  isolation_level="read_uncommitted", jaas_override_variables=None,
                  kafka_opts_override="", client_prop_file_override="", consumer_properties={}):
         """
@@ -76,12 +76,13 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
                                         successively consumed messages exceeds this timeout. Setting this and
                                         waiting for the consumer to stop is a pretty good way to consume all messages
                                         in a topic.
+            print_timestamp             if True, print each message's timestamp as well
             print_key                   if True, print each message's key as well
+            print_partition             if True, print each message's partition as well
             enable_systest_events       if True, console consumer will print additional lifecycle-related information
                                         only available in 0.10.0 and later.
             stop_timeout_sec            After stopping a node, wait up to stop_timeout_sec for the node to stop,
                                         and the corresponding background thread to finish successfully.
-            print_timestamp             if True, print each message's timestamp as well
             isolation_level             How to handle transactional messages.
             jaas_override_variables     A dict of variables to be used in the jaas.conf template file
             kafka_opts_override         Override parameters of the KAFKA_OPTS environment variable
@@ -108,6 +109,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
         self.clean_shutdown_nodes = set()
         self.client_id = client_id
         self.print_key = print_key
+        self.print_partition = print_partition
         self.log_level = "TRACE"
         self.stop_timeout_sec = stop_timeout_sec
 
@@ -191,11 +193,14 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             if node.version > LATEST_0_8_2:
                 cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
+        if self.print_timestamp:
+            cmd += " --property print.timestamp=true"
+
         if self.print_key:
             cmd += " --property print.key=true"
 
-        if self.print_timestamp:
-            cmd += " --property print.timestamp=true"
+        if self.print_partition:
+            cmd += " --property print.partition=true"
 
         # LoggingMessageFormatter was introduced after 0.9
         if node.version > LATEST_0_9:
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index acb2137..e59431e 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -829,7 +829,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         """
         Check whether a broker is registered in Zookeeper
         """
-        self.logger.debug("Querying zookeeper to see if broker %s is registered", node)
+        self.logger.debug("Querying zookeeper to see if broker %s is registered", str(node))
         broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node), chroot=self.zk_chroot)
         self.logger.debug("Broker info: %s", broker_info)
         return broker_info is not None
diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index 1a6a34c..dc972d7 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -46,13 +46,14 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
     }
 
     def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
-                 input_topic, input_partition, output_topic, max_messages = -1,
-                 transaction_size = 1000, enable_random_aborts=True):
+                 input_topic, input_partition, output_topic, max_messages=-1,
+                 transaction_size=1000, transaction_timeout=None, enable_random_aborts=True, use_group_metadata=False, group_mode=False):
         super(TransactionalMessageCopier, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.transactional_id = transactional_id
         self.consumer_group = consumer_group
         self.transaction_size = transaction_size
+        self.transaction_timeout = transaction_timeout
         self.input_topic = input_topic
         self.input_partition = input_partition
         self.output_topic = output_topic
@@ -62,6 +63,8 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
         self.remaining = -1
         self.stop_timeout_sec = 60
         self.enable_random_aborts = enable_random_aborts
+        self.use_group_metadata = use_group_metadata
+        self.group_mode = group_mode
         self.loggers = {
             "org.apache.kafka.clients.producer": "TRACE",
             "org.apache.kafka.clients.consumer": "TRACE"
@@ -120,9 +123,18 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
         cmd += " --input-partition %s" % str(self.input_partition)
         cmd += " --transaction-size %s" % str(self.transaction_size)
 
+        if self.transaction_timeout is not None:
+            cmd += " --transaction-timeout %s" % str(self.transaction_timeout)
+
         if self.enable_random_aborts:
             cmd += " --enable-random-aborts"
 
+        if self.use_group_metadata:
+            cmd += " --use-group-metadata"
+
+        if self.group_mode:
+            cmd += " --group-mode"
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE)
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 893baa4..caa3961 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -91,6 +91,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         for node in self.nodes:
             node.version = version
         self.acked_values = []
+        self.acked_values_by_partition = {}
         self._last_acked_offsets = {}
         self.not_acked_values = []
         self.produced_count = {}
@@ -178,7 +179,13 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
 
                     elif data["name"] == "producer_send_success":
                         partition = TopicPartition(data["topic"], data["partition"])
-                        self.acked_values.append(self.message_validator(data["value"]))
+                        value = self.message_validator(data["value"])
+                        self.acked_values.append(value)
+
+                        if partition not in self.acked_values_by_partition:
+                            self.acked_values_by_partition[partition] = []
+                        self.acked_values_by_partition[partition].append(value)
+
                         self._last_acked_offsets[partition] = data["offset"]
                         self.produced_count[idx] += 1
 
@@ -256,6 +263,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
             return self.acked_values
 
     @property
+    def acked_by_partition(self):
+        with self.lock:
+            return self.acked_values_by_partition
+
+    @property
     def not_acked(self):
         with self.lock:
             return self.not_acked_values
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py
similarity index 60%
copy from tests/kafkatest/tests/core/transactions_test.py
copy to tests/kafkatest/tests/core/group_mode_transactions_test.py
index 4da5960..deaa26d 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -26,16 +26,18 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 
-class TransactionsTest(Test):
-    """Tests transactions by transactionally copying data from a source topic to
-    a destination topic and killing the copy process as well as the broker
-    randomly through the process. In the end we verify that the final output
-    topic contains exactly one committed copy of each message in the input
-    topic
+class GroupModeTransactionsTest(Test):
+    """Essentially testing the same functionality as TransactionsTest by transactionally copying data
+    from a source topic to a destination topic and killing the copy process as well as the broker
+    randomly through the process. The major difference is that we choose to work as a collaborated
+    group with same topic subscription instead of individual copiers.
+
+    In the end we verify that the final output topic contains exactly one committed copy of
+    each message from the original producer.
     """
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
-        super(TransactionsTest, self).__init__(test_context=test_context)
+        super(GroupModeTransactionsTest, self).__init__(test_context=test_context)
 
         self.input_topic = "input-topic"
         self.output_topic = "output-topic"
@@ -43,11 +45,13 @@ class TransactionsTest(Test):
         self.num_brokers = 3
 
         # Test parameters
-        self.num_input_partitions = 2
-        self.num_output_partitions = 3
+        self.num_input_partitions = 9
+        self.num_output_partitions = 9
+        self.num_copiers = 3
         self.num_seed_messages = 100000
         self.transaction_size = 750
-        self.consumer_group = "transactions-test-consumer-group"
+        self.transaction_timeout = 10000
+        self.consumer_group = "grouped-transactions-test-consumer-group"
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context,
@@ -65,20 +69,21 @@ class TransactionsTest(Test):
                                            topic=topic,
                                            message_validator=is_int,
                                            max_messages=num_seed_messages,
-                                           enable_idempotence=True)
+                                           enable_idempotence=True,
+                                           repeating_keys=self.num_input_partitions)
         seed_producer.start()
         wait_until(lambda: seed_producer.num_acked >= num_seed_messages,
                    timeout_sec=seed_timeout_sec,
-                   err_msg="Producer failed to produce messages %d in  %ds." %\
-                   (self.num_seed_messages, seed_timeout_sec))
-        return seed_producer.acked
+                   err_msg="Producer failed to produce messages %d in  %ds." % \
+                           (self.num_seed_messages, seed_timeout_sec))
+        return seed_producer.acked_by_partition
 
     def get_messages_from_topic(self, topic, num_messages):
         consumer = self.start_consumer(topic, group_id="verifying_consumer")
         return self.drain_consumer(consumer, num_messages)
 
     def bounce_brokers(self, clean_shutdown):
-       for node in self.kafka.nodes:
+        for node in self.kafka.nodes:
             if clean_shutdown:
                 self.kafka.restart_node(node, clean_shutdown = True)
             else:
@@ -89,7 +94,7 @@ class TransactionsTest(Test):
                            hard-killed broker %s" % str(node.account))
                 self.kafka.start_node(node)
 
-    def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id):
+    def create_and_start_message_copier(self, input_topic, output_topic, transactional_id):
         message_copier = TransactionalMessageCopier(
             context=self.test_context,
             num_nodes=1,
@@ -97,10 +102,13 @@ class TransactionsTest(Test):
             transactional_id=transactional_id,
             consumer_group=self.consumer_group,
             input_topic=input_topic,
-            input_partition=input_partition,
+            input_partition=-1,
             output_topic=output_topic,
             max_messages=-1,
-            transaction_size=self.transaction_size
+            transaction_size=self.transaction_size,
+            transaction_timeout=self.transaction_timeout,
+            use_group_metadata=True,
+            group_mode=True
         )
         message_copier.start()
         wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
@@ -108,13 +116,13 @@ class TransactionsTest(Test):
                    err_msg="Message copier failed to start after 10 s")
         return message_copier
 
-    def bounce_copiers(self, copiers, clean_shutdown):
+    def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240):
         for _ in range(3):
             for copier in copiers:
                 wait_until(lambda: copier.progress_percent() >= 20.0,
-                           timeout_sec=30,
-                           err_msg="%s : Message copier didn't make enough progress in 30s. Current progress: %s" \
-                           % (copier.transactional_id, str(copier.progress_percent())))
+                           timeout_sec=timeout_sec,
+                           err_msg="%s : Message copier didn't make enough progress in %ds. Current progress: %s" \
+                                   % (copier.transactional_id, timeout_sec, str(copier.progress_percent())))
                 self.logger.info("%s - progress: %s" % (copier.transactional_id,
                                                         str(copier.progress_percent())))
                 copier.restart(clean_shutdown)
@@ -125,28 +133,54 @@ class TransactionsTest(Test):
             copiers.append(self.create_and_start_message_copier(
                 input_topic=input_topic,
                 output_topic=output_topic,
-                input_partition=i,
                 transactional_id="copier-" + str(i)
             ))
         return copiers
 
+    @staticmethod
+    def valid_value_and_partition(msg):
+        """Method used to check whether the given message is a valid tab
+        separated value + partition
+
+        return value and partition as a size-two array represented tuple: [value, partition]
+        """
+        try:
+            splitted_msg = msg.split('\t')
+            tuple = [int(splitted_msg[0]), int(splitted_msg[1])]
+            return tuple
+
+        except ValueError:
+            raise Exception("Unexpected message format (expected a tab separated [value, partition] tuple). Message: %s" % (msg))
+
     def start_consumer(self, topic_to_read, group_id):
         consumer = ConsoleConsumer(context=self.test_context,
                                    num_nodes=1,
                                    kafka=self.kafka,
                                    topic=topic_to_read,
                                    group_id=group_id,
-                                   message_validator=is_int,
+                                   message_validator=self.valid_value_and_partition,
                                    from_beginning=True,
+                                   print_partition=True,
                                    isolation_level="read_committed")
         consumer.start()
         # ensure that the consumer is up.
         wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True,
                    timeout_sec=60,
-                   err_msg="Consumer failed to consume any messages for %ds" %\
-                   60)
+                   err_msg="Consumer failed to consume any messages for %ds" % \
+                           60)
         return consumer
 
+    @staticmethod
+    def split_by_partition(messages_consumed):
+        messages_by_partition = {}
+
+        for msg in messages_consumed:
+            partition = msg[1]
+            if partition not in messages_by_partition:
+                messages_by_partition[partition] = []
+            messages_by_partition[partition].append(msg[0])
+        return messages_by_partition
+
     def drain_consumer(self, consumer, num_messages):
         # wait until we read at least the expected number of messages.
         # This is a safe check because both failure modes will be caught:
@@ -157,10 +191,10 @@ class TransactionsTest(Test):
         #     test to fail.
         wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages,
                    timeout_sec=90,
-                   err_msg="Consumer consumed only %d out of %d messages in %ds" %\
-                   (len(consumer.messages_consumed[1]), num_messages, 90))
+                   err_msg="Consumer consumed only %d out of %d messages in %ds" % \
+                           (len(consumer.messages_consumed[1]), num_messages, 90))
         consumer.stop()
-        return consumer.messages_consumed[1]
+        return self.split_by_partition(consumer.messages_consumed[1])
 
     def copy_messages_transactionally(self, failure_mode, bounce_target,
                                       input_topic, output_topic,
@@ -188,11 +222,12 @@ class TransactionsTest(Test):
         elif bounce_target == "clients":
             self.bounce_copiers(copiers, clean_shutdown)
 
+        copier_timeout_sec = 240
         for copier in copiers:
             wait_until(lambda: copier.is_done,
-                       timeout_sec=120,
-                       err_msg="%s - Failed to copy all messages in  %ds." %\
-                       (copier.transactional_id, 120))
+                       timeout_sec=copier_timeout_sec,
+                       err_msg="%s - Failed to copy all messages in %ds." % \
+                               (copier.transactional_id, copier_timeout_sec))
         self.logger.info("finished copying messages")
 
         return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
@@ -215,53 +250,63 @@ class TransactionsTest(Test):
             }
         }
 
-    @cluster(num_nodes=9)
+    @cluster(num_nodes=10)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"],
-            check_order=[True, False])
-    def test_transactions(self, failure_mode, bounce_target, check_order):
+            bounce_target=["brokers", "clients"])
+    def test_transactions(self, failure_mode, bounce_target):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
         self.kafka.logs["kafka_data_1"]["collect_default"] = True
         self.kafka.logs["kafka_data_2"]["collect_default"] = True
         self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True
-        if check_order:
-            # To check ordering, we simply create input and output topics
-            # with a single partition.
-            # We reduce the number of seed messages to copy to account for the fewer output
-            # partitions, and thus lower parallelism. This helps keep the test
-            # time shorter.
-            self.num_seed_messages = self.num_seed_messages / 3
-            self.num_input_partitions = 1
-            self.num_output_partitions = 1
 
         self.setup_topics()
         self.kafka.start()
 
-        input_messages = self.seed_messages(self.input_topic, self.num_seed_messages)
-        concurrently_consumed_messages = self.copy_messages_transactionally(
+        input_messages_by_partition = self.seed_messages(self.input_topic, self.num_seed_messages)
+        concurrently_consumed_message_by_partition = self.copy_messages_transactionally(
             failure_mode, bounce_target, input_topic=self.input_topic,
-            output_topic=self.output_topic, num_copiers=self.num_input_partitions,
+            output_topic=self.output_topic, num_copiers=self.num_copiers,
             num_messages_to_copy=self.num_seed_messages)
-        output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
+        output_messages_by_partition = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
 
-        concurrently_consumed_message_set = set(concurrently_consumed_messages)
-        output_message_set = set(output_messages)
-        input_message_set = set(input_messages)
+        assert len(input_messages_by_partition) == \
+               len(concurrently_consumed_message_by_partition), "The lengths of partition count doesn't match: " \
+                                                                "input partitions count %d, " \
+                                                                "concurrently consumed partitions count %d" % \
+                                                                (len(input_messages_by_partition), len(concurrently_consumed_message_by_partition))
 
-        num_dups = abs(len(output_messages) - len(output_message_set))
-        num_dups_in_concurrent_consumer = abs(len(concurrently_consumed_messages)
+        assert len(input_messages_by_partition) == \
+               len(output_messages_by_partition), "The lengths of partition count doesn't match: " \
+                                                  "input partitions count %d, " \
+                                                  "output partitions count %d" % \
+                                                  (len(input_messages_by_partition), len(concurrently_consumed_message_by_partition))
+
+        for p in range(self.num_input_partitions):
+            if p not in input_messages_by_partition:
+                continue
+
+            assert p in output_messages_by_partition, "Partition %d not in output messages"
+            assert p in concurrently_consumed_message_by_partition, "Partition %d not in concurrently consumed messages"
+
+            output_message_set = set(output_messages_by_partition[p])
+            input_message_set = set(input_messages_by_partition[p])
+
+            concurrently_consumed_message_set = set(concurrently_consumed_message_by_partition[p])
+
+            num_dups = abs(len(output_messages) - len(output_message_set))
+            num_dups_in_concurrent_consumer = abs(len(concurrently_consumed_messages)
                                               - len(concurrently_consumed_message_set))
-        assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups
-        assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\
-            (len(input_message_set), len(output_message_set))
-
-        assert num_dups_in_concurrent_consumer == 0, "Detected %d dups in concurrently consumed messages" % num_dups_in_concurrent_consumer
-        assert input_message_set == concurrently_consumed_message_set, \
-            "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\
-            (len(input_message_set), len(concurrently_consumed_message_set))
-        if check_order:
+            assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups
+            assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" % \
+                                                        (len(input_message_set), len(output_message_set))
+
+            assert num_dups_in_concurrent_consumer == 0, "Detected %d dups in concurrently consumed messages" % num_dups_in_concurrent_consumer
+            assert input_message_set == concurrently_consumed_message_set, \
+                "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" % \
+                (len(input_message_set), len(concurrently_consumed_message_set))
+
             assert input_messages == sorted(input_messages), "The seed messages themselves were not in order"
             assert output_messages == input_messages, "Output messages are not in order"
             assert concurrently_consumed_messages == output_messages, "Concurrently consumed messages are not in order"
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index 4da5960..2889f84 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -31,7 +31,7 @@ class TransactionsTest(Test):
     a destination topic and killing the copy process as well as the broker
     randomly through the process. In the end we verify that the final output
     topic contains exactly one committed copy of each message in the input
-    topic
+    topic.
     """
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
@@ -47,6 +47,7 @@ class TransactionsTest(Test):
         self.num_output_partitions = 3
         self.num_seed_messages = 100000
         self.transaction_size = 750
+        self.transaction_timeout = 10000
         self.consumer_group = "transactions-test-consumer-group"
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
@@ -69,7 +70,7 @@ class TransactionsTest(Test):
         seed_producer.start()
         wait_until(lambda: seed_producer.num_acked >= num_seed_messages,
                    timeout_sec=seed_timeout_sec,
-                   err_msg="Producer failed to produce messages %d in  %ds." %\
+                   err_msg="Producer failed to produce messages %d in %ds." %\
                    (self.num_seed_messages, seed_timeout_sec))
         return seed_producer.acked
 
@@ -89,7 +90,7 @@ class TransactionsTest(Test):
                            hard-killed broker %s" % str(node.account))
                 self.kafka.start_node(node)
 
-    def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id):
+    def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata):
         message_copier = TransactionalMessageCopier(
             context=self.test_context,
             num_nodes=1,
@@ -100,7 +101,9 @@ class TransactionsTest(Test):
             input_partition=input_partition,
             output_topic=output_topic,
             max_messages=-1,
-            transaction_size=self.transaction_size
+            transaction_size=self.transaction_size,
+            transaction_timeout=self.transaction_timeout,
+            use_group_metadata=use_group_metadata
         )
         message_copier.start()
         wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
@@ -119,14 +122,15 @@ class TransactionsTest(Test):
                                                         str(copier.progress_percent())))
                 copier.restart(clean_shutdown)
 
-    def create_and_start_copiers(self, input_topic, output_topic, num_copiers):
+    def create_and_start_copiers(self, input_topic, output_topic, num_copiers, use_group_metadata):
         copiers = []
         for i in range(0, num_copiers):
             copiers.append(self.create_and_start_message_copier(
                 input_topic=input_topic,
                 output_topic=output_topic,
                 input_partition=i,
-                transactional_id="copier-" + str(i)
+                transactional_id="copier-" + str(i),
+                use_group_metadata=use_group_metadata
             ))
         return copiers
 
@@ -164,7 +168,8 @@ class TransactionsTest(Test):
 
     def copy_messages_transactionally(self, failure_mode, bounce_target,
                                       input_topic, output_topic,
-                                      num_copiers, num_messages_to_copy):
+                                      num_copiers, num_messages_to_copy,
+                                      use_group_metadata):
         """Copies messages transactionally from the seeded input topic to the
         output topic, either bouncing brokers or clients in a hard and soft
         way as it goes.
@@ -176,7 +181,8 @@ class TransactionsTest(Test):
         """
         copiers = self.create_and_start_copiers(input_topic=input_topic,
                                                 output_topic=output_topic,
-                                                num_copiers=num_copiers)
+                                                num_copiers=num_copiers,
+                                                use_group_metadata=use_group_metadata)
         concurrent_consumer = self.start_consumer(output_topic,
                                                   group_id="concurrent_consumer")
         clean_shutdown = False
@@ -188,11 +194,12 @@ class TransactionsTest(Test):
         elif bounce_target == "clients":
             self.bounce_copiers(copiers, clean_shutdown)
 
+        copier_timeout_sec = 120
         for copier in copiers:
             wait_until(lambda: copier.is_done,
-                       timeout_sec=120,
+                       timeout_sec=copier_timeout_sec,
                        err_msg="%s - Failed to copy all messages in  %ds." %\
-                       (copier.transactional_id, 120))
+                       (copier.transactional_id, copier_timeout_sec))
         self.logger.info("finished copying messages")
 
         return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
@@ -218,8 +225,9 @@ class TransactionsTest(Test):
     @cluster(num_nodes=9)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
             bounce_target=["brokers", "clients"],
-            check_order=[True, False])
-    def test_transactions(self, failure_mode, bounce_target, check_order):
+            check_order=[True, False],
+            use_group_metadata=[True, False])
+    def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
@@ -243,7 +251,7 @@ class TransactionsTest(Test):
         concurrently_consumed_messages = self.copy_messages_transactionally(
             failure_mode, bounce_target, input_topic=self.input_topic,
             output_topic=self.output_topic, num_copiers=self.num_input_partitions,
-            num_messages_to_copy=self.num_seed_messages)
+            num_messages_to_copy=self.num_seed_messages, use_group_metadata=use_group_metadata)
         output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
 
         concurrently_consumed_message_set = set(concurrently_consumed_messages)
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 1ea826d..13c18d0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -22,6 +22,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -35,8 +36,12 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.utils.Exit;
 
-import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -54,6 +59,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  */
 public class TransactionalMessageCopier {
 
+    private static final DateFormat FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
+
     /** Get the command-line argument parser. */
     private static ArgumentParser argParser() {
         ArgumentParser parser = ArgumentParsers
@@ -122,6 +129,15 @@ public class TransactionalMessageCopier {
                 .dest("messagesPerTransaction")
                 .help("The number of messages to put in each transaction. Default is 200.");
 
+        parser.addArgument("--transaction-timeout")
+                .action(store())
+                .required(false)
+                .setDefault(60000)
+                .type(Integer.class)
+                .metavar("TRANSACTION-TIMEOUT")
+                .dest("transactionTimeout")
+                .help("The transaction timeout in milliseconds. Default is 60000(1 minute).");
+
         parser.addArgument("--transactional-id")
                 .action(store())
                 .required(true)
@@ -137,16 +153,28 @@ public class TransactionalMessageCopier {
                 .dest("enableRandomAborts")
                 .help("Whether or not to enable random transaction aborts (for system testing)");
 
+        parser.addArgument("--group-mode")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("GROUP-MODE")
+                .dest("groupMode")
+                .help("Whether to let consumer subscribe to the input topic or do manual assign. If we do" +
+                          " subscription based consumption, the input partition shall be ignored");
+
+        parser.addArgument("--use-group-metadata")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("USE-GROUP-METADATA")
+                .dest("useGroupMetadata")
+                .help("Whether to use the new transactional commit API with group metadata");
+
         return parser;
     }
 
     private static KafkaProducer<String, String> createProducer(Namespace parsedArgs) {
-        String transactionalId = parsedArgs.getString("transactionalId");
-        String brokerList = parsedArgs.getString("brokerList");
-
         Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parsedArgs.getString("brokerList"));
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, parsedArgs.getString("transactionalId"));
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
@@ -156,6 +184,7 @@ public class TransactionalMessageCopier {
         // the case with multiple inflights.
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, parsedArgs.getInt("transactionTimeout"));
 
         return new KafkaProducer<>(props);
     }
@@ -173,6 +202,7 @@ public class TransactionalMessageCopier {
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000");
         props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
@@ -184,7 +214,7 @@ public class TransactionalMessageCopier {
     }
 
     private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) {
-        return new ProducerRecord<>(topic, record.key(), record.value());
+        return new ProducerRecord<>(topic, record.partition(), record.key(), record.value());
     }
 
     private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) {
@@ -226,45 +256,72 @@ public class TransactionalMessageCopier {
         return json;
     }
 
-    private static String statusAsJson(long consumed, long remaining, String transactionalId) {
+    private static synchronized String statusAsJson(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId, String stage) {
         Map<String, Object> statusData = new HashMap<>();
         statusData.put("progress", transactionalId);
-        statusData.put("consumed", consumed);
+        statusData.put("totalProcessed", totalProcessed);
+        statusData.put("consumed", consumedSinceLastRebalanced);
         statusData.put("remaining", remaining);
+        statusData.put("time", FORMAT.format(new Date()));
+        statusData.put("stage", stage);
         return toJsonString(statusData);
     }
 
-    private static String shutDownString(long consumed, long remaining, String transactionalId) {
+    private static synchronized String shutDownString(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId) {
         Map<String, Object> shutdownData = new HashMap<>();
-        shutdownData.put("remaining", remaining);
-        shutdownData.put("consumed", consumed);
         shutdownData.put("shutdown_complete", transactionalId);
+        shutdownData.put("totalProcessed", totalProcessed);
+        shutdownData.put("consumed", consumedSinceLastRebalanced);
+        shutdownData.put("remaining", remaining);
+        shutdownData.put("time", FORMAT.format(new Date()));
         return toJsonString(shutdownData);
     }
 
-    public static void main(String[] args) throws IOException {
+    public static void main(String[] args) {
         Namespace parsedArgs = argParser().parseArgsOrFail(args);
-        Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
         final String transactionalId = parsedArgs.getString("transactionalId");
         final String outputTopic = parsedArgs.getString("outputTopic");
 
         String consumerGroup = parsedArgs.getString("consumerGroup");
-        TopicPartition inputPartition = new TopicPartition(parsedArgs.getString("inputTopic"), parsedArgs.getInt("inputPartition"));
 
         final KafkaProducer<String, String> producer = createProducer(parsedArgs);
         final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs);
 
-        consumer.assign(singleton(inputPartition));
+        final AtomicLong remainingMessages = new AtomicLong(
+            parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages"));
+
+        boolean groupMode = parsedArgs.getBoolean("groupMode");
+        String topicName = parsedArgs.getString("inputTopic");
+        final AtomicLong numMessagesProcessedSinceLastRebalance = new AtomicLong(0);
+        final AtomicLong totalMessageProcessed = new AtomicLong(0);
+        if (groupMode) {
+            consumer.subscribe(Collections.singleton(topicName), new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                    remainingMessages.set(partitions.stream()
+                        .mapToLong(partition -> messagesRemaining(consumer, partition)).sum());
+                    numMessagesProcessedSinceLastRebalance.set(0);
+                    // We use message cap for remaining here as the remainingMessages are not set yet.
+                    System.out.println(statusAsJson(totalMessageProcessed.get(),
+                        numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "RebalanceComplete"));
+                }
+            });
+        } else {
+            TopicPartition inputPartition = new TopicPartition(topicName, parsedArgs.getInt("inputPartition"));
+            consumer.assign(singleton(inputPartition));
+            remainingMessages.set(Math.min(messagesRemaining(consumer, inputPartition), remainingMessages.get()));
+        }
 
-        long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages");
-        maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages);
         final boolean enableRandomAborts = parsedArgs.getBoolean("enableRandomAborts");
 
         producer.initTransactions();
 
         final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
-        final AtomicLong remainingMessages = new AtomicLong(maxMessages);
-        final AtomicLong numMessagesProcessed = new AtomicLong(0);
+
         Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
             isShuttingDown.set(true);
             // Flush any remaining messages
@@ -272,41 +329,51 @@ public class TransactionalMessageCopier {
             synchronized (consumer) {
                 consumer.close();
             }
-            System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+            System.out.println(shutDownString(totalMessageProcessed.get(),
+                numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId));
         });
 
+        final boolean useGroupMetadata = parsedArgs.getBoolean("useGroupMetadata");
         try {
             Random random = new Random();
-            while (0 < remainingMessages.get()) {
-                System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+            while (remainingMessages.get() > 0) {
+                System.out.println(statusAsJson(totalMessageProcessed.get(),
+                    numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "ProcessLoop"));
                 if (isShuttingDown.get())
                     break;
-                int messagesInCurrentTransaction = 0;
-                long numMessagesForNextTransaction = Math.min(numMessagesPerTransaction, remainingMessages.get());
 
-                try {
-                    producer.beginTransaction();
-                    while (messagesInCurrentTransaction < numMessagesForNextTransaction) {
-                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
+                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
+                if (records.count() > 0) {
+                    try {
+                        producer.beginTransaction();
+
                         for (ConsumerRecord<String, String> record : records) {
                             producer.send(producerRecordFromConsumerRecord(outputTopic, record));
-                            messagesInCurrentTransaction++;
                         }
-                    }
-                    producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
 
-                    if (enableRandomAborts && random.nextInt() % 3 == 0) {
-                        throw new KafkaException("Aborting transaction");
-                    } else {
-                        producer.commitTransaction();
-                        remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction));
+                        long messagesSentWithinCurrentTxn = records.count();
+
+                        if (useGroupMetadata) {
+                            producer.sendOffsetsToTransaction(consumerPositions(consumer), consumer.groupMetadata());
+                        } else {
+                            producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
+                        }
+
+                        if (enableRandomAborts && random.nextInt() % 3 == 0) {
+                            throw new KafkaException("Aborting transaction");
+                        } else {
+                            producer.commitTransaction();
+                            remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
+                            numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
+                            totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
+                        }
+                    } catch (ProducerFencedException | OutOfOrderSequenceException e) {
+                        // We cannot recover from these errors, so just rethrow them and let the process fail
+                        throw e;
+                    } catch (KafkaException e) {
+                        producer.abortTransaction();
+                        resetToLastCommittedPositions(consumer);
                     }
-                } catch (ProducerFencedException | OutOfOrderSequenceException e) {
-                    // We cannot recover from these errors, so just rethrow them and let the process fail
-                    throw e;
-                } catch (KafkaException e) {
-                    producer.abortTransaction();
-                    resetToLastCommittedPositions(consumer);
                 }
             }
         } finally {
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 9cad90f..f34b9e2 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -159,8 +159,9 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
                     partitionRecords.size(), minOffset, maxOffset));
 
             if (verbose) {
-                for (ConsumerRecord<String, String> record : partitionRecords)
+                for (ConsumerRecord<String, String> record : partitionRecords) {
                     printJson(new RecordData(record));
+                }
             }
 
             consumedMessages += partitionRecords.size();
@@ -595,10 +596,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
     public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
         Namespace res = parser.parseArgs(args);
 
-        String topic = res.getString("topic");
         boolean useAutoCommit = res.getBoolean("useAutoCommit");
-        int maxMessages = res.getInt("maxMessages");
-        boolean verbose = res.getBoolean("verbose");
         String configFile = res.getString("consumer.config");
 
         Properties consumerProps = new Properties();
@@ -625,6 +623,10 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
         StringDeserializer deserializer = new StringDeserializer();
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
 
+        String topic = res.getString("topic");
+        int maxMessages = res.getInt("maxMessages");
+        boolean verbose = res.getBoolean("verbose");
+
         return new VerifiableConsumer(
                 consumer,
                 System.out,


[kafka] 01/03: KAFKA-9447: Add new customized EOS model example (#8031)

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a4b2a086f8639dece77a95b47bbb79840a455150
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Feb 5 16:51:08 2020 -0800

    KAFKA-9447: Add new customized EOS model example (#8031)
    
    With the improvement of 447, we are now offering developers a better experience on writing their customized EOS apps with group subscription, instead of manual assignments. With the demo, user should be able to get started more quickly on writing their own EOS app, and understand the processing logic much better.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/admin/DescribeTopicsResult.java  |  23 +--
 .../kafka/clients/admin/ListTopicsResult.java      |  14 +-
 examples/README                                    |   8 +-
 examples/bin/exactly-once-demo.sh                  |  23 +++
 .../src/main/java/kafka/examples/Consumer.java     |  32 +++-
 .../examples/ExactlyOnceMessageProcessor.java      | 209 +++++++++++++++++++++
 .../kafka/examples/KafkaConsumerProducerDemo.java  |  14 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  | 185 ++++++++++++++++++
 .../src/main/java/kafka/examples/Producer.java     |  42 ++++-
 9 files changed, 508 insertions(+), 42 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 34698b9..7753984 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -51,21 +51,18 @@ public class DescribeTopicsResult {
      */
     public KafkaFuture<Map<String, TopicDescription>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.BaseFunction<Void, Map<String, TopicDescription>>() {
-                @Override
-                public Map<String, TopicDescription> apply(Void v) {
-                    Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
-                    for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
-                        try {
-                            descriptions.put(entry.getKey(), entry.getValue().get());
-                        } catch (InterruptedException | ExecutionException e) {
-                            // This should be unreachable, because allOf ensured that all the futures
-                            // completed successfully.
-                            throw new RuntimeException(e);
-                        }
+            thenApply(v -> {
+                Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+                for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+                    try {
+                        descriptions.put(entry.getKey(), entry.getValue().get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, because allOf ensured that all the futures
+                        // completed successfully.
+                        throw new RuntimeException(e);
                     }
-                    return descriptions;
                 }
+                return descriptions;
             });
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index 4e7e1a2..2154073 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -48,23 +48,13 @@ public class ListTopicsResult {
      * Return a future which yields a collection of TopicListing objects.
      */
     public KafkaFuture<Collection<TopicListing>> listings() {
-        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Collection<TopicListing>>() {
-            @Override
-            public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
-                return namesToDescriptions.values();
-            }
-        });
+        return future.thenApply(namesToDescriptions -> namesToDescriptions.values());
     }
 
     /**
      * Return a future which yields a collection of topic names.
      */
     public KafkaFuture<Set<String>> names() {
-        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Set<String>>() {
-            @Override
-            public Set<String> apply(Map<String, TopicListing> namesToListings) {
-                return namesToListings.keySet();
-            }
-        });
+        return future.thenApply(namesToListings -> namesToListings.keySet());
     }
 }
diff --git a/examples/README b/examples/README
index f6e3410..2efe71a 100644
--- a/examples/README
+++ b/examples/README
@@ -6,4 +6,10 @@ To run the demo:
    2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
    3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
    4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
-
+   5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh standaloneMode 6 3 50000`,
+      this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records
+   6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh groupMode 6 3 50000`,
+      this means the same as the standalone demo, except consumers are using subscription mode.
+   7. Some notes for exactly once demo:
+      7.1. The Kafka server has to be on broker version 2.5 or higher to be able to run group mode.
+      7.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments"
diff --git a/examples/bin/exactly-once-demo.sh b/examples/bin/exactly-once-demo.sh
new file mode 100755
index 0000000..e9faa42
--- /dev/null
+++ b/examples/bin/exactly-once-demo.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)/../..
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+
+exec $base_dir/bin/kafka-run-class.sh kafka.examples.KafkaExactlyOnceDemo $@
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 26d6e23..19cb67c 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -25,24 +25,45 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 public class Consumer extends ShutdownableThread {
     private final KafkaConsumer<Integer, String> consumer;
     private final String topic;
+    private final String groupId;
+    private final int numMessageToConsume;
+    private int messageRemaining;
+    private final CountDownLatch latch;
 
-    public Consumer(String topic) {
+    public Consumer(final String topic,
+                    final String groupId,
+                    final boolean readCommitted,
+                    final int numMessageToConsume,
+                    final CountDownLatch latch) {
         super("KafkaConsumerExample", false);
+        this.groupId = groupId;
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        if (readCommitted) {
+            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         consumer = new KafkaConsumer<>(props);
         this.topic = topic;
+        this.numMessageToConsume = numMessageToConsume;
+        this.messageRemaining = numMessageToConsume;
+        this.latch = latch;
+    }
+
+    KafkaConsumer<Integer, String> get() {
+        return consumer;
     }
 
     @Override
@@ -50,7 +71,12 @@ public class Consumer extends ShutdownableThread {
         consumer.subscribe(Collections.singletonList(this.topic));
         ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+            System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
+        }
+        messageRemaining -= records.count();
+        if (messageRemaining <= 0) {
+            System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
+            latch.countDown();
         }
     }
 
diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
new file mode 100644
index 0000000..53685f3
--- /dev/null
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
+ * Important configurations and APIs are commented.
+ */
+public class ExactlyOnceMessageProcessor extends Thread {
+
+    private static final boolean READ_COMMITTED = true;
+
+    private final String mode;
+    private final String inputTopic;
+    private final String outputTopic;
+    private final String consumerGroupId;
+    private final int numPartitions;
+    private final int numInstances;
+    private final int instanceIdx;
+    private final String transactionalId;
+
+    private final KafkaProducer<Integer, String> producer;
+    private final KafkaConsumer<Integer, String> consumer;
+
+    private final CountDownLatch latch;
+
+    public ExactlyOnceMessageProcessor(final String mode,
+                                       final String inputTopic,
+                                       final String outputTopic,
+                                       final int numPartitions,
+                                       final int numInstances,
+                                       final int instanceIdx,
+                                       final CountDownLatch latch) {
+        this.mode = mode;
+        this.inputTopic = inputTopic;
+        this.outputTopic = outputTopic;
+        this.consumerGroupId = "Eos-consumer";
+        this.numPartitions = numPartitions;
+        this.numInstances = numInstances;
+        this.instanceIdx = instanceIdx;
+        this.transactionalId = "Processor-" + instanceIdx;
+        // A unique transactional.id must be provided in order to properly use EOS.
+        producer = new Producer(outputTopic, true, transactionalId, true, -1, null).get();
+        // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
+        consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get();
+        this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+        // Init transactions call should always happen first in order to clear zombie transactions from previous generation.
+        producer.initTransactions();
+
+        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
+
+        // Under group mode, topic based subscription is sufficient as EOS apps are safe to cooperate transactionally after 2.5.
+        // Under standalone mode, user needs to manually assign the topic partitions and make sure the assignment is unique
+        // across the consumer group instances.
+        if (this.mode.equals("groupMode")) {
+            consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                    printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                    printWithTxnId("Received partition assignment after rebalancing: " + partitions);
+                    messageRemaining.set(messagesRemaining(consumer));
+                }
+            });
+        } else {
+            // Do a range assignment of topic partitions.
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            int rangeSize = numPartitions / numInstances;
+            int startPartition = rangeSize * instanceIdx;
+            int endPartition = Math.min(numPartitions - 1, startPartition + rangeSize - 1);
+            for (int partition = startPartition; partition <= endPartition; partition++) {
+                topicPartitions.add(new TopicPartition(inputTopic, partition));
+            }
+
+            consumer.assign(topicPartitions);
+            printWithTxnId("Manually assign partitions: " + topicPartitions);
+        }
+
+        int messageProcessed = 0;
+        boolean abortPreviousTransaction = false;
+        while (messageRemaining.get() > 0) {
+            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
+            if (records.count() > 0) {
+                try {
+                    // Abort previous transaction if instructed.
+                    if (abortPreviousTransaction) {
+                        producer.abortTransaction();
+                        // The consumer fetch position also needs to be reset.
+                        resetToLastCommittedPositions(consumer);
+                        abortPreviousTransaction = false;
+                    }
+                    // Begin a new transaction session.
+                    producer.beginTransaction();
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        // Process the record and send to downstream.
+                        ProducerRecord<Integer, String> customizedRecord = transform(record);
+                        producer.send(customizedRecord);
+                    }
+                    Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
+                    for (TopicPartition topicPartition : consumer.assignment()) {
+                        positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
+                    }
+                    // Checkpoint the progress by sending offsets to group coordinator broker.
+                    // Under group mode, we must apply consumer group metadata for proper fencing.
+                    if (this.mode.equals("groupMode")) {
+                        producer.sendOffsetsToTransaction(positions, consumer.groupMetadata());
+                    } else {
+                        producer.sendOffsetsToTransaction(positions, consumerGroupId);
+                    }
+
+                    // Finish the transaction. All sent records should be visible for consumption now.
+                    producer.commitTransaction();
+                    messageProcessed += records.count();
+                } catch (CommitFailedException e) {
+                    // In case of a retriable exception, suggest aborting the ongoing transaction for correctness.
+                    abortPreviousTransaction = true;
+                } catch (ProducerFencedException | FencedInstanceIdException e) {
+                    throw new KafkaException("Encountered fatal error during processing: " + e.getMessage());
+                }
+            }
+            messageRemaining.set(messagesRemaining(consumer));
+            printWithTxnId("Message remaining: " + messageRemaining);
+        }
+
+        printWithTxnId("Finished processing " + messageProcessed + " records");
+        latch.countDown();
+    }
+
+    private void printWithTxnId(final String message) {
+        System.out.println(transactionalId + ": " + message);
+    }
+
+    private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
+        printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
+        return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
+    }
+
+    private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
+        final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
+        // If we couldn't detect any end offset, that means we are still not able to fetch offsets.
+        if (fullEndOffsets.isEmpty()) {
+            return Long.MAX_VALUE;
+        }
+
+        return consumer.assignment().stream().mapToLong(partition -> {
+            long currentPosition = consumer.position(partition);
+            printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
+            if (fullEndOffsets.containsKey(partition)) {
+                return fullEndOffsets.get(partition) - currentPosition;
+            }
+            return 0;
+        }).sum();
+    }
+
+    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {
+        final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
+        consumer.assignment().forEach(tp -> {
+            OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+            if (offsetAndMetadata != null)
+                consumer.seek(tp, offsetAndMetadata.offset());
+            else
+                consumer.seekToBeginning(Collections.singleton(tp));
+        });
+    }
+}
diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index f42ed6f..561732b 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,14 +16,20 @@
  */
 package kafka.examples;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class KafkaConsumerProducerDemo {
-    public static void main(String[] args) {
+    public static void main(String[] args) throws InterruptedException {
         boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
-        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
+        CountDownLatch latch = new CountDownLatch(2);
+        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, latch);
         producerThread.start();
 
-        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
+        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch);
         consumerThread.start();
-
+        latch.await(5, TimeUnit.MINUTES);
+        consumerThread.shutdown();
+        System.out.println("All finished!");
     }
 }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
new file mode 100644
index 0000000..d418eba
--- /dev/null
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This exactly once demo driver takes 4 arguments:
+ *   - mode: whether to run as standalone app, or a group
+ *   - partition: number of partitions for input/output topic
+ *   - instances: number of instances
+ *   - records: number of records
+ * An example argument list would be `groupMode 6 3 50000`
+ *
+ * The driver could be decomposed as following stages:
+ *
+ * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start.
+ *
+ * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into
+ *    the input topic. The driver will block for the record generation to finish, so the producer
+ *    must be in synchronous sending mode.
+ *
+ * 3. Set up transactional instances in separate threads which does a consume-process-produce loop,
+ *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will
+ *    drain all the records from either given partitions or auto assigned partitions by actively
+ *    comparing log end offset with committed offset. Each record will be processed exactly once
+ *    as dividing the key by 2, and extend the value message. The driver will block for all the record
+ *    processing to finish. The transformed record shall be written to the output topic, with
+ *    transactional guarantee.
+ *
+ * 4. Set up a read committed consumer in a separate thread to verify we have all records within
+ *    the output topic, while the message ordering on partition level is maintained.
+ *    The driver will block for the consumption of all committed records.
+ *
+ * From this demo, you could see that all the records from pre-population are processed exactly once,
+ * in either standalone mode or group mode, with strong partition level ordering guarantee.
+ *
+ * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5
+ * in order to run group mode, otherwise the app could throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ */
+public class KafkaExactlyOnceDemo {
+
+    private static final String INPUT_TOPIC = "input-topic";
+    private static final String OUTPUT_TOPIC = "output-topic";
+
+    public static void main(String[] args) throws InterruptedException, ExecutionException {
+        if (args.length != 4) {
+            throw new IllegalArgumentException("Should accept 4 parameters: [mode], " +
+                "[number of partitions], [number of instances], [number of records]");
+        }
+
+        String mode = args[0];
+        int numPartitions = Integer.valueOf(args[1]);
+        int numInstances = Integer.valueOf(args[2]);
+        int numRecords = Integer.valueOf(args[3]);
+
+        /* Stage 1: topic cleanup and recreation */
+        recreateTopics(numPartitions);
+
+        CountDownLatch prePopulateLatch = new CountDownLatch(1);
+
+        /* Stage 2: pre-populate records */
+        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch);
+        producerThread.start();
+
+        prePopulateLatch.await(5, TimeUnit.MINUTES);
+
+        CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
+
+        /* Stage 3: transactionally process all messages */
+        for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
+            ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(mode,
+                INPUT_TOPIC, OUTPUT_TOPIC, numPartitions,
+                numInstances, instanceIdx, transactionalCopyLatch);
+            messageProcessor.start();
+        }
+
+        transactionalCopyLatch.await(5, TimeUnit.MINUTES);
+
+        CountDownLatch consumeLatch = new CountDownLatch(1);
+
+        /* Stage 4: consume all processed messages to verify exactly once */
+        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch);
+        consumerThread.start();
+
+        consumeLatch.await(5, TimeUnit.MINUTES);
+        consumerThread.shutdown();
+        System.out.println("All finished!");
+    }
+
+    private static void recreateTopics(final int numPartitions)
+        throws ExecutionException, InterruptedException {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
+
+        Admin adminClient = Admin.create(props);
+
+        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
+
+        deleteTopic(adminClient, topicsToDelete);
+
+        // Check topic existence in a retry loop
+        while (true) {
+            System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete);
+
+            Set<String> listedTopics = adminClient.listTopics().names().get();
+            System.out.println("Current list of topics: " + listedTopics);
+
+            boolean hasTopicInfo = false;
+            for (String listedTopic : listedTopics) {
+                if (topicsToDelete.contains(listedTopic)) {
+                    hasTopicInfo = true;
+                    break;
+                }
+            }
+            if (!hasTopicInfo) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // Create topics in a retry loop
+        while (true) {
+            final short replicationFactor = 1;
+            final List<NewTopic> newTopics = Arrays.asList(
+                new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
+                new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
+            try {
+                adminClient.createTopics(newTopics).all().get();
+                System.out.println("Created new topics: " + newTopics);
+                break;
+            } catch (ExecutionException e) {
+                if (!(e.getCause() instanceof TopicExistsException)) {
+                    throw e;
+                }
+                System.out.println("Metadata of the old topics are not cleared yet...");
+
+                deleteTopic(adminClient, topicsToDelete);
+
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    private static void deleteTopic(final Admin adminClient, final List<String> topicsToDelete)
+        throws InterruptedException, ExecutionException {
+        try {
+            adminClient.deleteTopics(topicsToDelete).all().get();
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw e;
+            }
+            System.out.println("Encountered exception during topic deletion: " + e.getCause());
+        }
+        System.out.println("Deleted old topics: " + topicsToDelete);
+    }
+}
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index b6998c5..3805dd3 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -25,45 +25,69 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 public class Producer extends Thread {
     private final KafkaProducer<Integer, String> producer;
     private final String topic;
     private final Boolean isAsync;
+    private int numRecords;
+    private final CountDownLatch latch;
 
-    public Producer(String topic, Boolean isAsync) {
+    public Producer(final String topic,
+                    final Boolean isAsync,
+                    final String transactionalId,
+                    final boolean enableIdempotency,
+                    final int numRecords,
+                    final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        if (transactionalId != null) {
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+
         producer = new KafkaProducer<>(props);
         this.topic = topic;
         this.isAsync = isAsync;
+        this.numRecords = numRecords;
+        this.latch = latch;
+    }
+
+    KafkaProducer<Integer, String> get() {
+        return producer;
     }
 
+    @Override
     public void run() {
-        int messageNo = 1;
-        while (true) {
-            String messageStr = "Message_" + messageNo;
+        int messageKey = 0;
+        int recordsSent = 0;
+        while (recordsSent < numRecords) {
+            String messageStr = "Message_" + messageKey;
             long startTime = System.currentTimeMillis();
             if (isAsync) { // Send asynchronously
                 producer.send(new ProducerRecord<>(topic,
-                    messageNo,
-                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));
+                    messageKey,
+                    messageStr), new DemoCallBack(startTime, messageKey, messageStr));
             } else { // Send synchronously
                 try {
                     producer.send(new ProducerRecord<>(topic,
-                        messageNo,
+                        messageKey,
                         messageStr)).get();
-                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
+                    System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
                 } catch (InterruptedException | ExecutionException e) {
                     e.printStackTrace();
                 }
             }
-            ++messageNo;
+            messageKey += 2;
+            recordsSent += 1;
         }
+        System.out.println("Producer sent " + numRecords + " records successfully");
+        latch.countDown();
     }
 }
 


[kafka] 02/03: HOTFIX: Fix spotsbug failure in Kafka examples (#8051)

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c853929c3a7c57f2292e676efe4286e34ddeea10
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Feb 6 10:58:05 2020 -0800

    HOTFIX: Fix spotsbug failure in Kafka examples (#8051)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/examples/KafkaConsumerProducerDemo.java    |  8 +++++++-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java    | 20 ++++++++++++++------
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 561732b..21d85c3 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,6 +16,8 @@
  */
 package kafka.examples;
 
+import org.apache.kafka.common.errors.TimeoutException;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +30,11 @@ public class KafkaConsumerProducerDemo {
 
         Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch);
         consumerThread.start();
-        latch.await(5, TimeUnit.MINUTES);
+
+        if (!latch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for demo producer and consumer to finish");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index d418eba..288b786 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -19,6 +19,7 @@ package kafka.examples;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
@@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo {
         }
 
         String mode = args[0];
-        int numPartitions = Integer.valueOf(args[1]);
-        int numInstances = Integer.valueOf(args[2]);
-        int numRecords = Integer.valueOf(args[3]);
+        int numPartitions = Integer.parseInt(args[1]);
+        int numInstances = Integer.parseInt(args[2]);
+        int numRecords = Integer.parseInt(args[3]);
 
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
@@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo {
         Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch);
         producerThread.start();
 
-        prePopulateLatch.await(5, TimeUnit.MINUTES);
+        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
+        }
 
         CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
 
@@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo {
             messageProcessor.start();
         }
 
-        transactionalCopyLatch.await(5, TimeUnit.MINUTES);
+        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");
+        }
 
         CountDownLatch consumeLatch = new CountDownLatch(1);
 
@@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo {
         Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch);
         consumerThread.start();
 
-        consumeLatch.await(5, TimeUnit.MINUTES);
+        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }