You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/09 14:18:07 UTC

[GitHub] [kafka] lbradstreet opened a new pull request #9147: MINOR: supervise TransactionalMessageCopier producer

lbradstreet opened a new pull request #9147:
URL: https://github.com/apache/kafka/pull/9147


   transactions_test and group_mode_transactions_test have proven quite
   brittle around timeouts (see 67f5b5de77d67c02edb335737215312d099a1cac,
   e099b58df5b3e4f87173fc55880f9c343308739f,
   d9fe30dab0fc56318b012731c348ed1ddae2ec04,
   07db26c20fcbccbf758591607864f7fd4bd8975f). This fights a losing battle,
   especially if we want to increase the types of nemesis checks that we
   want to perform on transaction support (e.g. iptables based
   partitioning). This PR creates a new producer when the prior one is in
   an unrecoverable state alllowing us to still test the EOS invariants
   required.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9147: MINOR: supervise TransactionalMessageCopier producer

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9147:
URL: https://github.com/apache/kafka/pull/9147#discussion_r467590975



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
##########
@@ -345,39 +357,55 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
                 if (records.count() > 0) {
                     try {
-                        producer.beginTransaction();
-
-                        for (ConsumerRecord<String, String> record : records) {
-                            producer.send(producerRecordFromConsumerRecord(outputTopic, record));
-                        }
-
-                        long messagesSentWithinCurrentTxn = records.count();
-
-                        if (useGroupMetadata) {
-                            producer.sendOffsetsToTransaction(consumerPositions(consumer), consumer.groupMetadata());
-                        } else {
-                            producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
-                        }
-
-                        if (enableRandomAborts && random.nextInt() % 3 == 0) {
-                            throw new KafkaException("Aborting transaction");
-                        } else {
-                            producer.commitTransaction();
-                            remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
-                            numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
-                            totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
+                        try {
+                            producer.get().beginTransaction();
+
+                            for (ConsumerRecord<String, String> record : records) {
+                                producer.get().send(producerRecordFromConsumerRecord(outputTopic, record));
+                            }
+
+                            long messagesSentWithinCurrentTxn = records.count();
+
+                            if (useGroupMetadata) {
+                                producer.get().sendOffsetsToTransaction(consumerPositions(consumer),
+                                        consumer.groupMetadata());
+                            } else {
+                                producer.get().sendOffsetsToTransaction(consumerPositions(consumer),
+                                        consumerGroup);
+                            }
+
+                            if (enableRandomAborts && random.nextInt() % 3 == 0) {
+                                throw new KafkaException("Aborting transaction");
+                            } else {
+                                producer.get().commitTransaction();
+                                remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
+                                numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
+                                totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
+                            }
+                        } catch (ProducerFencedException | OutOfOrderSequenceException e) {
+                            // handle these exception in the outer exception handling
+                            throw e;
+                        } catch (KafkaException e) {
+                            // this may throw a ProducerFencedException on recovery
+                            // this will handled in the outer catch if necessary
+                            System.out.println(handledExceptionJson(totalMessageProcessed.get(),
+                                    numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, e));
+                            producer.get().abortTransaction();
+                            resetToLastCommittedPositions(consumer);
                         }
                     } catch (ProducerFencedException | OutOfOrderSequenceException e) {
-                        // We cannot recover from these errors, so just rethrow them and let the process fail
-                        throw e;
-                    } catch (KafkaException e) {
-                        producer.abortTransaction();
+                        // These failures are not recoverable with the same producer

Review comment:
       I think this is a reasonable way to deal with these issues as the main thing we are trying to test here is that we produce an exact copy from the input topic to output topic, however I do have questions about what kind of exceptions we should allow for. Originally my change allowed for supervision of ProducerFencedException(s) but not OutOfOrderSequenceException(s). If we do not expect those in any cases where we are messing with cluster behavior (broker bounces, network partitioning, etc) then I can take the handling out for the OoOS case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9147: MINOR: supervise TransactionalMessageCopier producer

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9147:
URL: https://github.com/apache/kafka/pull/9147#discussion_r468940704



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
##########
@@ -284,7 +296,7 @@ public static void main(String[] args) {
 
         String consumerGroup = parsedArgs.getString("consumerGroup");
 
-        final KafkaProducer<String, String> producer = createProducer(parsedArgs);
+        AtomicReference<KafkaProducer<String, String>> producer = new AtomicReference<>(createProducer(parsedArgs));

Review comment:
       volatiles can't be local. Arguably this could get refactored a little bit if we don't want to call get everywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org