You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/18 19:16:51 UTC

[GitHub] [kafka] philipnee opened a new pull request #11689: KAFKA-12841 patch

philipnee opened a new pull request #11689:
URL: https://github.com/apache/kafka/pull/11689


   Jira: https://issues.apache.org/jira/browse/KAFKA-12841
   
   Using the `InterceptorCallback` wrapper in the case of `ApiException` so that we will adhere correctly to the `Callback` contract for `onCompletion` specifying a valid (dummy) `TopicPartition`.   Removed some documentation from Callback.java that stated "except topicPartition" as we are assigning -1 to the topicPartition if it doesn't exist.  The changes is based on https://issues.apache.org/jira/browse/KAFKA-3303
   
   ### Committer Checklist (excluded from commit message)
    * [ ]  Verify design and implementation
    * [ ]  Verify test coverage and CI build status
    * [ ]  Verify documentation (including upgrade notes)
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r797884312



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,46 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure");

Review comment:
       Done - see line 1545 in the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#issuecomment-1028467009


   test failures are not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #11689:
URL: https://github.com/apache/kafka/pull/11689


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r788921625



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
##########
@@ -123,6 +122,10 @@ public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTop
         }
     }
 
+    public static <K, V> TopicPartition createTopicPartition(ProducerRecord<K, V> record) {

Review comment:
       This is not a very good name since "creating a partition" usually means calling `createPartitions`. How about `extractTopicPartition` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r788922915



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);

Review comment:
       It's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);
+                }
+
+                assertEquals(invalidTopicName, recordMetadata.topic());
+
+                try {
+                    assertEquals(RecordMetadata.UNKNOWN_PARTITION, recordMetadata.partition());

Review comment:
       As before, it's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);
+                }
+
+                assertEquals(invalidTopicName, recordMetadata.topic());
+
+                try {
+                    assertEquals(RecordMetadata.UNKNOWN_PARTITION, recordMetadata.partition());

Review comment:
       As before, it's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error message.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);

Review comment:
       It's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r788919892



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
##########
@@ -25,10 +25,10 @@
     /**
      * A callback method the user can implement to provide asynchronous handling of request completion. This method will
      * be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
+     * metadata will contain the special -1 value for all fields.

Review comment:
       Please add a sentence specifying that we will return a partition ID of -1 if no partition could be chosen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r797873026



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,46 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure");

Review comment:
       can you please also check that the partition id gets set to -1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org