You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/04 05:44:57 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645299252



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
     @Override
     public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueSerializer);
+            if (defaultValueSerializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueDeserializer);
+            if (defaultValueDeserializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       nit: use `StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");
+                }
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
             }
             if (valueSerializer == null) {
+                if (context.valueSerde() == null || context.valueSerde().serializer() == null) {

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -767,10 +767,9 @@ public void shouldUseNewConfigsWhenPresent() {
     }
 
     @Test
-    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
-        assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
-        assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde);

Review comment:
       Instead of removing those, should we replace them with `assertThrows` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {

Review comment:
       I don't think that a `Serde` should ever wrap `null` ? Can we omit the second check?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -115,7 +118,10 @@ public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serialize
         @Override
         public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
+                if (defaultKeyDeserializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1384,8 +1383,12 @@ public Serde defaultKeySerde() {
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       I think it would be cleaner to check if `DEFAULT_KEY_SERDE_CLASS_CONFIG` is set or not (ie, `keySerdeConfigSetting == null`), before calling `getConfiguredInstance()` and just return `null` if it's not set, instead of handling a NPE

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -105,7 +108,10 @@ private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializ
         @Override
         public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = Objects.requireNonNull(defaultValueDeserializer);
+                if (defaultValueDeserializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -37,10 +38,16 @@
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
+        } else {
+            serializerToUse = specificSerializer;
+        }
+        if (serializerToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde through produced or materialized, or set one through the default." + serde + ".serde config for node " + name);

Review comment:
       as above. use `StreamsConfig#...`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -49,9 +56,15 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
-        Serde<T> serdeToUse = specificSerde;
-        if (serdeToUse == null) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
             serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : contextValueSerde);
+        } else {
+            serdeToUse = specificSerde;
+        }
+        if (serdeToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or set one through the default." + serde + ".serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -57,7 +57,10 @@ private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
         @Override
         public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultValueSerializer);
+                if (defaultValueSerializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Should this check be done before we call `deserializationExceptionHandler.handle` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -55,7 +55,10 @@ public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPs
         @Override
         public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
+                if (defaultKeySerializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       It seems this test should verify the error for miss-configures (but existing serdes).
   
   Thus, it seems we should set byte-array serdes on the config but not modify the test by any other means?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1403,8 +1406,12 @@ public Serde defaultValueSerde() {
             serde.configure(originals(), false);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
     }
 
     KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
+        if (keyDeserializer == null) {

Review comment:
       In which case can `keyDeserializer` be `null` here?
   
   Should we not better ensure that it's never `null` to begin with? Have the check in `init()` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org