You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/16 21:24:15 UTC

[GitHub] [kafka] C0urante opened a new pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

C0urante opened a new pull request #11508:
URL: https://github.com/apache/kafka/pull/11508


   Some types of exceptions that the producer reports are encountered synchronously during a call to `KafkaProducer::send`, but reported asynchronously via the resulting `Future` instead of being thrown directly from the call to `send`. One common example of this is when the size of a record exceeds the client-side `max.message.bytes` configuration property; when that happens, a failed future is returned immediately from `KafkaProducer::send`.
   
   This is all fine, but the current behavior does not take into account the guarantees surrounding the transactional producer's `commitTransaction` method, which are that:
   > ...if any of the `send(ProducerRecord)` calls which were part of the transaction hit irrecoverable errors, this method will throw the last received exception immediately and the transaction will not be committed. So all `send(ProducerRecord)` calls in a transaction must succeed in order for this method to succeed.
   
   The changes in this PR cause producers to fail on `commitTransaction` if any prior calls to `send` resulted in one of these aynchronously-reported, synchronously-encountered exceptions.
   
   An alternative approach could be to throw these exceptions directly to the caller in `send`, but that would alter producer behavior for all users instead of just ones using transactional producers; that tradeoff seems less desirable just to fix an issue that only affects transactional producers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #11508:
URL: https://github.com/apache/kafka/pull/11508


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11508:
URL: https://github.com/apache/kafka/pull/11508#discussion_r791977277



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() {
         }
     }
 
+    @Test
+    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+        String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining());

Review comment:
       We could get away with a single `*`

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() {
         }
     }
 
+    @Test
+    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+        String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining());
+        ProducerRecord<String, String> largeRecord = new ProducerRecord<>(topic, "large string", largeString);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
+    public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record for a not-yet-created topic
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");

Review comment:
       We can use `new ProducerRecord<>(topic, "value");` to simplify it a tiny bit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11508:
URL: https://github.com/apache/kafka/pull/11508#discussion_r792021213



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() {
         }
     }
 
+    @Test
+    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+        String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining());

Review comment:
       Haha fair, will do.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() {
         }
     }
 
+    @Test
+    public void testCommitTransactionWithRecordTooLargeException() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000);
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = mock(ProducerMetadata.class);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster);
+
+        String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining());
+        ProducerRecord<String, String> largeRecord = new ProducerRecord<>(topic, "large string", largeString);
+
+        try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions();
+
+            client.prepareResponse(endTxnResponse(Errors.NONE));
+            producer.beginTransaction();
+            TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class);
+            assertThrows(KafkaException.class, producer::commitTransaction);
+        }
+    }
+
+    @Test
+    public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record for a not-yet-created topic
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");

Review comment:
       Good call, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11508:
URL: https://github.com/apache/kafka/pull/11508#issuecomment-1021504481


   Thanks @mimaison, appreciate it! This should make KIP-618 a little easier to implement (see the note [here](https://github.com/apache/kafka/pull/10907#discussion_r786314621) regarding the workaround I've had to employ right now).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11508:
URL: https://github.com/apache/kafka/pull/11508#issuecomment-972219334


   @ableegoldman @guozhangwang @mimaison would any of you be able to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org