You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 23:10:11 UTC

[kafka] branch 2.1 updated: KAFKA-7584: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String (#5874)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d8e0f43  KAFKA-7584: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String (#5874)
d8e0f43 is described below

commit d8e0f4364eb5514c12dfe7e5f64145a4f1064158
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Nov 15 13:26:58 2018 -0800

    KAFKA-7584: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String (#5874)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 26 +++++++++++++++--
 .../apache/kafka/streams/StreamsConfigTest.java    | 34 +++++++++++++++++-----
 2 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b112a54..7b8f2d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -886,10 +886,30 @@ public class StreamsConfig extends AbstractConfig {
         // consumer/producer configurations, log a warning and remove the user defined value from the Map.
         // Thus the default values for these consumer/producer configurations that are suitable for
         // Streams will be used instead.
-        final Object maxInFlightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-        if (eosEnabled && maxInFlightRequests != null && 5 < (int) maxInFlightRequests) {
-            throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't exceed 5 when using the idempotent producer");
+
+        if (eosEnabled) {
+            final Object maxInFlightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+
+            if (maxInFlightRequests != null) {
+                final int maxInFlightRequestsAsInteger;
+                if (maxInFlightRequests instanceof Integer) {
+                    maxInFlightRequestsAsInteger = (Integer) maxInFlightRequests;
+                } else if (maxInFlightRequests instanceof String) {
+                    try {
+                        maxInFlightRequestsAsInteger = Integer.parseInt(((String) maxInFlightRequests).trim());
+                    } catch (final NumberFormatException e) {
+                        throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequests, "String value could not be parsed as 32-bit integer");
+                    }
+                } else {
+                    throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequests, "Expected value to be a 32-bit integer, but it was a " + maxInFlightRequests.getClass().getName());
+                }
+
+                if (maxInFlightRequestsAsInteger > 5) {
+                    throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is enabled");
+                }
+            }
         }
+
         for (final String config: nonConfigurableConfigs) {
             if (clientProvidedProps.containsKey(config)) {
                 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is set to " + EXACTLY_ONCE + ". Hence, ";
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 83279cc..654fcfb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -123,7 +123,7 @@ public class StreamsConfigTest {
         assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
         assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
-        assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
+        assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
         assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
         assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
@@ -233,7 +233,6 @@ public class StreamsConfigTest {
         assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
     }
 
-
     @Test
     public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
@@ -427,7 +426,7 @@ public class StreamsConfigTest {
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
+        assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false));
     }
 
     @Test
@@ -570,15 +569,36 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+    public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         try {
             streamsConfig.getProducerConfigs("clientId");
-            fail("Should throw ConfigException when Eos is enabled and maxInFlight requests exceeds 5");
+            fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
+        } catch (final ConfigException e) {
+            assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
+
+        new StreamsConfig(props).getProducerConfigs("clientId");
+    }
+
+    @Test
+    public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number");
+
+        try {
+            new StreamsConfig(props).getProducerConfigs("clientId");
+            fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
         } catch (final ConfigException e) {
-            assertEquals("max.in.flight.requests.per.connection can't exceed 5 when using the idempotent producer", e.getMessage());
+            assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
         }
     }