You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/26 19:58:35 UTC
kafka git commit: MINOR: updated configs to use one try/catch for
serdes
Repository: kafka
Updated Branches:
refs/heads/trunk f4d1305bf -> f8498ec9e
MINOR: updated configs to use one try/catch for serdes
removed `try/catch` from `keySerde` and `valueSerde` methods so only the `try\catch` blocks in `defaultKeySerde` and `defaultValueSerde` perform error handling resulting in correct error message.
Author: Bill Bejeck <bi...@confluent.io>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3568 from bbejeck/MINOR_ensure_correct_error_messages_for_configs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8498ec9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8498ec9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8498ec9
Branch: refs/heads/trunk
Commit: f8498ec9e27ca0f08e3791d7a19ad8c6a97e210f
Parents: f4d1305
Author: Bill Bejeck <bi...@confluent.io>
Authored: Wed Jul 26 12:58:33 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 26 12:58:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 20 +++----
.../apache/kafka/streams/StreamsConfigTest.java | 55 ++++++++++++++++++++
2 files changed, 63 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 94ad87b..e4869ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -746,11 +746,7 @@ public class StreamsConfig extends AbstractConfig {
*/
@Deprecated
public Serde keySerde() {
- try {
- return defaultKeySerde();
- } catch (final Exception e) {
- throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e);
- }
+ return defaultKeySerde();
}
/**
@@ -760,16 +756,18 @@ public class StreamsConfig extends AbstractConfig {
* @return an configured instance of key Serde class
*/
public Serde defaultKeySerde() {
+ Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
try {
Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
if (serde == null) {
+ keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
}
serde.configure(originals(), true);
return serde;
} catch (final Exception e) {
throw new StreamsException(
- String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e);
+ String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
}
}
@@ -781,11 +779,7 @@ public class StreamsConfig extends AbstractConfig {
*/
@Deprecated
public Serde valueSerde() {
- try {
- return defaultValueSerde();
- } catch (final Exception e) {
- throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
- }
+ return defaultValueSerde();
}
/**
@@ -795,9 +789,11 @@ public class StreamsConfig extends AbstractConfig {
* @return an configured instance of value Serde class
*/
public Serde defaultValueSerde() {
+ Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
try {
Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
if (serde == null) {
+ valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
}
serde.configure(originals(), false);
@@ -805,7 +801,7 @@ public class StreamsConfig extends AbstractConfig {
return serde;
} catch (final Exception e) {
throw new StreamsException(
- String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
+ String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8498ec9/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 9f0f67a..3bbd69e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class StreamsConfigTest {
@@ -428,6 +429,60 @@ public class StreamsConfigTest {
assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
}
+ @Test
+ public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
+ final Properties props = minimalStreamsConfig();
+ props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+ final StreamsConfig config = new StreamsConfig(props);
+ try {
+ config.keySerde();
+ fail("Test should throw a StreamsException");
+ } catch (StreamsException e) {
+ assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldSpecifyCorrectKeySerdeClassOnError() {
+ final Properties props = minimalStreamsConfig();
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+ final StreamsConfig config = new StreamsConfig(props);
+ try {
+ config.keySerde();
+ fail("Test should throw a StreamsException");
+ } catch (StreamsException e) {
+ assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
+ final Properties props = minimalStreamsConfig();
+ props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+ final StreamsConfig config = new StreamsConfig(props);
+ try {
+ config.valueSerde();
+ fail("Test should throw a StreamsException");
+ } catch (StreamsException e) {
+ assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldSpecifyCorrectValueSerdeClassOnError() {
+ final Properties props = minimalStreamsConfig();
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+ final StreamsConfig config = new StreamsConfig(props);
+ try {
+ config.valueSerde();
+ fail("Test should throw a StreamsException");
+ } catch (StreamsException e) {
+ assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
+ }
+ }
+
+
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {