You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/09 17:30:53 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

rajinisivaram commented on a change in pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#discussion_r667103591



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -172,6 +173,115 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
                 config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
     }
 
+    @Test
+    public void testAcksAndIdempotenceForIdempotentProducers() {
+        Properties validProps = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Review comment:
       We could move the common configs to a base properties instance and invoke putAll() in each of these instead of repeating these 3 configs for each case.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -172,6 +173,115 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
                 config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
     }
 
+    @Test
+    public void testAcksAndIdempotenceForIdempotentProducers() {
+        Properties validProps = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.ACKS_CONFIG, "0");
+            setProperty(
+                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+        }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "0",
+            "acks should be overwritten");
+
+        Properties validProps2 = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+        }};
+        assertDoesNotThrow(
+            () -> new ProducerConfig(validProps2),
+            "Idempotence should be enable by default, " +
+                "ack should be set to all by default, " +
+                "thus specifying a transaction id should be ok");

Review comment:
       We could check ENABLE_IDEMPOTENCE_CONFIG and ACKS_CONFIG instead of `assertDoesNotThrow`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -172,6 +173,115 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
                 config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
     }
 
+    @Test
+    public void testAcksAndIdempotenceForIdempotentProducers() {
+        Properties validProps = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.ACKS_CONFIG, "0");
+            setProperty(
+                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+        }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "0",
+            "acks should be overwritten");
+
+        Properties validProps2 = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+        }};
+        assertDoesNotThrow(
+            () -> new ProducerConfig(validProps2),
+            "Idempotence should be enable by default, " +
+                "ack should be set to all by default, " +
+                "thus specifying a transaction id should be ok");

Review comment:
       We can check the values of idempotence and acks instead of `assertDoesNotThrow`

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -172,6 +173,115 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
                 config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
     }
 
+    @Test
+    public void testAcksAndIdempotenceForIdempotentProducers() {
+        Properties validProps = new Properties() {{
+            setProperty(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            setProperty(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            setProperty(
+                ProducerConfig.ACKS_CONFIG, "0");
+            setProperty(
+                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");

Review comment:
       nit: unnecessary newlines in these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org