You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/26 19:58:35 UTC

kafka git commit: MINOR: updated configs to use one try/catch for serdes

Repository: kafka
Updated Branches:
  refs/heads/trunk f4d1305bf -> f8498ec9e


MINOR: updated configs to use one try/catch for serdes

removed `try/catch` from `keySerde` and `valueSerde` methods so only the `try\catch` blocks in `defaultKeySerde` and `defaultValueSerde` perform error handling resulting in correct error message.

Author: Bill Bejeck <bi...@confluent.io>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3568 from bbejeck/MINOR_ensure_correct_error_messages_for_configs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8498ec9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8498ec9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8498ec9

Branch: refs/heads/trunk
Commit: f8498ec9e27ca0f08e3791d7a19ad8c6a97e210f
Parents: f4d1305
Author: Bill Bejeck <bi...@confluent.io>
Authored: Wed Jul 26 12:58:33 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 26 12:58:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 20 +++----
 .../apache/kafka/streams/StreamsConfigTest.java | 55 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 94ad87b..e4869ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -746,11 +746,7 @@ public class StreamsConfig extends AbstractConfig {
      */
     @Deprecated
     public Serde keySerde() {
-        try {
-            return defaultKeySerde();
-        } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e);
-        }
+        return defaultKeySerde();
     }
 
     /**
@@ -760,16 +756,18 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of key Serde class
      */
     public Serde defaultKeySerde() {
+        Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
         try {
             Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
             if (serde == null) {
+                keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
                 serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
             }
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(
-                String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e);
+                String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
         }
     }
 
@@ -781,11 +779,7 @@ public class StreamsConfig extends AbstractConfig {
      */
     @Deprecated
     public Serde valueSerde() {
-        try {
-            return defaultValueSerde();
-        } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
-        }
+        return defaultValueSerde();
     }
 
     /**
@@ -795,9 +789,11 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of value Serde class
      */
     public Serde defaultValueSerde() {
+        Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
         try {
             Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
             if (serde == null) {
+                valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
                 serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
             }
             serde.configure(originals(), false);
@@ -805,7 +801,7 @@ public class StreamsConfig extends AbstractConfig {
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(
-                String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
+                String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 9f0f67a..3bbd69e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StreamsConfigTest {
 
@@ -428,6 +429,60 @@ public class StreamsConfigTest {
         assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
     }
 
+    @Test
+    public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.keySerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectKeySerdeClassOnError() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.keySerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.valueSerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectValueSerdeClassOnError() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.valueSerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+        }
+    }
+
+
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {