You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/03 22:51:35 UTC

[GitHub] [kafka] lct45 opened a new pull request #10813: KAFKA-9559: Change default serde to be `null`

lct45 opened a new pull request #10813:
URL: https://github.com/apache/kafka/pull/10813


   Implementation of [KIP-741](https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null) per [KAFKA-9559](https://issues.apache.org/jira/browse/KAFKA-9559). Changes the default serde from `byteArray` to `null`. This allows us to throw `ConfigExceptions` instead of NPEs which will give users a better idea of where their serdes are misconfigured.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661681961



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -69,6 +70,9 @@
                 Optional.empty()
             );
         } catch (final Exception deserializationException) {
+            if (deserializationException instanceof ConfigException) {

Review comment:
       IIRC it's deep in `sourceNode.deserializeKey()` but I ran into this in early testing so it may be obsolete. I removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r648383533



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
         final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
-        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
-        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer, this.name());
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer, this.name());

Review comment:
       @ableegoldman and I talked about this PR briefly yesterday, maybe she has thoughts here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661757249



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {

Review comment:
       That's smart, and seems to mostly work. In order to change the actual `setIfUnset` code though, I have to do some casting since the serde in `serdeGetter` is <?> and I'm running into issues with that now... but just for one set of classes 😕. Do you know if there's a better way to handle the generics? @mjsax 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r646992164



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       Nevermind, got it! It can be passed in with the topology test driver (: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645299252



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
     @Override
     public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueSerializer);
+            if (defaultValueSerializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueDeserializer);
+            if (defaultValueDeserializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       nit: use `StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");
+                }
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
             }
             if (valueSerializer == null) {
+                if (context.valueSerde() == null || context.valueSerde().serializer() == null) {

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -767,10 +767,9 @@ public void shouldUseNewConfigsWhenPresent() {
     }
 
     @Test
-    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
-        assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
-        assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde);

Review comment:
       Instead of removing those, should we replace them with `assertThrows` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {

Review comment:
       I don't think that a `Serde` should ever wrap `null` ? Can we omit the second check?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -115,7 +118,10 @@ public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serialize
         @Override
         public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
+                if (defaultKeyDeserializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1384,8 +1383,12 @@ public Serde defaultKeySerde() {
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       I think it would be cleaner to check if `DEFAULT_KEY_SERDE_CLASS_CONFIG` is set or not (ie, `keySerdeConfigSetting == null`), before calling `getConfiguredInstance()` and just return `null` if it's not set, instead of handling a NPE

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -105,7 +108,10 @@ private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializ
         @Override
         public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = Objects.requireNonNull(defaultValueDeserializer);
+                if (defaultValueDeserializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -37,10 +38,16 @@
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
+        } else {
+            serializerToUse = specificSerializer;
+        }
+        if (serializerToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde through produced or materialized, or set one through the default." + serde + ".serde config for node " + name);

Review comment:
       as above. use `StreamsConfig#...`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -49,9 +56,15 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
-        Serde<T> serdeToUse = specificSerde;
-        if (serdeToUse == null) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
             serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : contextValueSerde);
+        } else {
+            serdeToUse = specificSerde;
+        }
+        if (serdeToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or set one through the default." + serde + ".serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -57,7 +57,10 @@ private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
         @Override
         public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultValueSerializer);
+                if (defaultValueSerializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Should this check be done before we call `deserializationExceptionHandler.handle` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -55,7 +55,10 @@ public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPs
         @Override
         public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
+                if (defaultKeySerializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       It seems this test should verify the error for miss-configures (but existing serdes).
   
   Thus, it seems we should set byte-array serdes on the config but not modify the test by any other means?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1403,8 +1406,12 @@ public Serde defaultValueSerde() {
             serde.configure(originals(), false);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
     }
 
     KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
+        if (keyDeserializer == null) {

Review comment:
       In which case can `keyDeserializer` be `null` here?
   
   Should we not better ensure that it's never `null` to begin with? Have the check in `init()` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661896938



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -188,10 +188,11 @@ public boolean persistent() {
         return false;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
-        this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
-        this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde;
+    public void setSerdesIfNull(final ProcessorContext context) {
+        keySerde = (keySerde == null) ? (Serde<K>) context.keySerde() : keySerde;

Review comment:
       Why adding `()` for `(keySerde == null)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r646714882



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
     }
 
     KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
+        if (keyDeserializer == null) {

Review comment:
       Ahh good catch, I did a check in `WrappingNullableUtils` for the serializer but missed it for the deserializers somehow, I added it there and removed this. Now we throw the config error when we try to build a topology without a default or a serde passed in




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661685610



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {

Review comment:
       That's a good point... Could we add an internal interface "SerdeGetter" (for lack of a better name), that both `ProcessroContext` and `StateStoreContext` extends? And `SerdeGetter` has two methods `keySerder()` and `valueSerde()` ? For this case, we don't need any refactoring (I would hope) and could pass `SerdeGetter` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r646974657



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       I see what you mean, I think it would be valuable to have both. The catch for me was trying to figure out how to set streams configs in this test set up, is there a way to pass them in somewhere that I'm missing? I haven't been able to find anything @mjsax 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r647928413



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
         final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
-        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
-        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer, this.name());
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer, this.name());

Review comment:
       My argument was mainly about unifying code, ie, try to avoid `null`-checks on different places, but do the `null`-check on a unified place (to avoid that we forget the `null`-check).
   
   Might be good to get the opinion of other on this question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#issuecomment-854379331


   How confident are we, that we covered most cases to avoid NPE and throw ConfigException? Seems it's still many places that we need to cover, and I am wondering if we follow the right approach?
   
   For example, instead of getting `keySerde` in some "outer layer" and forward a potential `null` that we check later, should we not pass around the `config` instead and call `defaultKeySerde` is we need it and let it through the ConfigException directly?
   
   Atm, the code seems rather scattered and it might be better to try to unify it more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661673686



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {

Review comment:
       This is what I mentioned in my main comment - these methods are ultimately referenced from `MeteredKeyValueStore`, `MeteredSessionStore`, and `MeteredWindowStore` in methods that have both `StateStoreContexts` and `ProcessorContext`. We can change it so we pass in the context but we'd need two sets of methods, one for state store contexts and one for processor contexts. The reason i didn't go ahead and do that was because it seemed a little messy, just a trade-off I guess. WDYT @mjsax? The other alternative I thought of was making `processorContext` and `StateStoreContext` related so we could only needed one set of functions but that ended up being a decently sized refactor when I tried to do that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661897526



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java
##########
@@ -36,13 +37,14 @@
         this.keySerializer = keySerializer;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (keySerializer == null) {
-            keySerializer = Objects.requireNonNull(defaultKeySerializer, "defaultKeySerializer cannot be null");
+            keySerializer = (Serializer<K>) Objects.requireNonNull(getter.keySerde().serializer(), "defaultKeySerializer cannot be null");

Review comment:
       We can remove `requireNonNull` here, because `getter.keySerde()` would already throw a `ConfigException` if the default serde is null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661675545



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
     public Serde defaultKeySerde() {
         final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         try {
+            if (keySerdeConfigSetting ==  null) {
+                return null;

Review comment:
       I don't think we want to, this is called when we create the `AbstractProcessorContext` which happens before we actually set the serdes. Since we don't check the configs and actually set the serde till later, I don't think we want to throw an exception here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r646979245



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
         final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
-        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
-        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer, this.name());
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer, this.name());

Review comment:
       Hmm yeah that works too, either way we're doing null checks after pulling the serializer/deserializer through `ProcessorContextUtils` so I don't feel like it changes it drastically. I wonder, is it easier for debugging to have the context serializer/deserializer pulled out here, at the same place we set it eventually? Or is it easier to debug if we pull it out where we're potentially throwing an error, down in `WrappingNullableUtils#prepareSerializer`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645299252



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
     @Override
     public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueSerializer);
+            if (defaultValueSerializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueDeserializer);
+            if (defaultValueDeserializer == null) {
+                throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       nit: use `StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");
+                }
                 foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
             }
             if (valueSerializer == null) {
+                if (context.valueSerde() == null || context.valueSerde().serializer() == null) {

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -767,10 +767,9 @@ public void shouldUseNewConfigsWhenPresent() {
     }
 
     @Test
-    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
-        final StreamsConfig config = new StreamsConfig(getStreamsConfig());
-        assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
-        assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde);

Review comment:
       Instead of removing those, should we replace them with `assertThrows` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null || context.keySerde().serializer() == null) {

Review comment:
       I don't think that a `Serde` should ever wrap `null` ? Can we omit the second check?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -115,7 +118,10 @@ public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serialize
         @Override
         public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
+                if (defaultKeyDeserializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1384,8 +1383,12 @@ public Serde defaultKeySerde() {
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       I think it would be cleaner to check if `DEFAULT_KEY_SERDE_CLASS_CONFIG` is set or not (ie, `keySerdeConfigSetting == null`), before calling `getConfiguredInstance()` and just return `null` if it's not set, instead of handling a NPE

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -105,7 +108,10 @@ private SubscriptionResponseWrapperDeserializer(final Deserializer<V> deserializ
         @Override
         public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = Objects.requireNonNull(defaultValueDeserializer);
+                if (defaultValueDeserializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -37,10 +38,16 @@
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
+        } else {
+            serializerToUse = specificSerializer;
+        }
+        if (serializerToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde through produced or materialized, or set one through the default." + serde + ".serde config for node " + name);

Review comment:
       as above. use `StreamsConfig#...`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -49,9 +56,15 @@
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
-        Serde<T> serdeToUse = specificSerde;
-        if (serdeToUse == null) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
             serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : contextValueSerde);
+        } else {
+            serdeToUse = specificSerde;
+        }
+        if (serdeToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or set one through the default." + serde + ".serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -57,7 +57,10 @@ private SubscriptionResponseWrapperSerializer(final Serializer<V> serializer) {
         @Override
         public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V> defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultValueSerializer);
+                if (defaultValueSerializer == null) {
+                    throw new ConfigException("Please specify a value serde or set one through the default.value.serde config");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Should this check be done before we call `deserializationExceptionHandler.handle` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -55,7 +55,10 @@ public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPs
         @Override
         public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void> defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
+                if (defaultKeySerializer == null) {
+                    throw new ConfigException("Please specify a key serde or set one through the default.key.serde config");

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
##########
@@ -160,8 +160,8 @@ private void testMetrics(final String builtInMetricsVersion) {
     }
 
     @Test
-    public void testTopologyLevelClassCastException() {
-        // Serdes configuration is missing (default will be used which don't match the DSL below), which will trigger the new exception
+    public void testTopologyLevelConfigException() {

Review comment:
       It seems this test should verify the error for miss-configures (but existing serdes).
   
   Thus, it seems we should set byte-array serdes on the config but not modify the test by any other means?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1403,8 +1406,12 @@ public Serde defaultValueSerde() {
             serde.configure(originals(), false);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(
-                String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
+            if (e instanceof NullPointerException) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
##########
@@ -51,10 +52,16 @@ public SourceNode(final String name,
     }
 
     KIn deserializeKey(final String topic, final Headers headers, final byte[] data) {
+        if (keyDeserializer == null) {

Review comment:
       In which case can `keyDeserializer` be `null` here?
   
   Should we not better ensure that it's never `null` to begin with? Have the check in `init()` method?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
         final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
-        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
-        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer, this.name());
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer, this.name());

Review comment:
       It seems better to not pass a potential `null` `contextKeySerializer` or `contextValueSerializer`, but instead forward the `context` directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661899168



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -31,13 +32,14 @@
         this.keyDeserializer = keyDeserializer;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (keyDeserializer == null) {
-            keyDeserializer = Objects.requireNonNull(defaultKeyDeserializer, "defaultKeyDeserializer cannot be null");
+            keyDeserializer = (Deserializer<K>) Objects.requireNonNull(getter.keySerde().deserializer(), "defaultKeyDeserializer cannot be null");

Review comment:
       `requireNonNull` seems not to be necessary




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661899168



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -31,13 +32,14 @@
         this.keyDeserializer = keyDeserializer;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final Deserializer<Void> defaultValueDeserializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (keyDeserializer == null) {
-            keyDeserializer = Objects.requireNonNull(defaultKeyDeserializer, "defaultKeyDeserializer cannot be null");
+            keyDeserializer = (Deserializer<K>) Objects.requireNonNull(getter.keySerde().deserializer(), "defaultKeyDeserializer cannot be null");

Review comment:
       as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#issuecomment-854379331


   How confident are we, that we covered most cases to avoid NPE and throw ConfigException? Seems it's still many places that we need to cover, and I am wondering if we follow the right approach?
   
   For example, instead of getting `keySerde` in some "outer layer" and forward a potential `null` that we check later, should we not pass around the `config` instead and call `defaultKeySerde` is we need it and let it through the ConfigException directly?
   
   Atm, the code seems rather scattered and it might be better to try to unify it more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r648383874



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Makes sense, I'll shift around




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661045444



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
     public Serde defaultKeySerde() {
         final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         try {
+            if (keySerdeConfigSetting ==  null) {
+                return null;

Review comment:
       It's been a while, but I am still wondering if we should throw a `ConfigException` directly instead of returning `null`? And this would be the only place in the code when we throw a `ConfigException` for this case. Below, there is some repetitive code that calls `defaultKeySerde` (or `defaultValueSerd`) and throws `ConfigException` if `null` is returned.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -69,6 +70,9 @@
                 Optional.empty()
             );
         } catch (final Exception deserializationException) {
+            if (deserializationException instanceof ConfigException) {

Review comment:
       Where does the `ConfigException` come from?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,20 +31,37 @@
 public class WrappingNullableUtils {
 
     @SuppressWarnings("unchecked")
-    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final boolean isKey) {
-        Deserializer<T> deserializerToUse = specificDeserializer;
-        if (deserializerToUse == null) {
+    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) {
+        final Deserializer<?> contextKeyDeserializer = context.keySerde() == null ? null : context.keySerde().deserializer();
+        final Deserializer<?> contextValueDeserializer = context.valueSerde() == null ? null : context.valueSerde().deserializer();
+        final Deserializer<T> deserializerToUse;
+
+        if (specificDeserializer == null) {
             deserializerToUse = (Deserializer<T>) (isKey ? contextKeyDeserializer : contextValueDeserializer);
+        } else {
+            deserializerToUse = specificDeserializer;
+        }
+        if (deserializerToUse == null) {
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Failed to create deserializers. Please specify a " + serde + " serde through produced or materialized, or set one through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) + "_SERDE_CLASS_CONFIG for node " + name);

Review comment:
       Following the comments from above. If we throw in `context.keySerde()` we can avoid this redundant code. Of course, we should only call `context.keySerde()` if `specificDeserializer == null` for this case.
   
   Similar below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1445,6 +1447,9 @@ public Serde defaultKeySerde() {
     public Serde defaultValueSerde() {
         final Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
         try {
+            if (valueSerdeConfigSetting == null) {
+                return null;

Review comment:
       As above

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
##########
@@ -39,7 +39,10 @@ public ChangedDeserializer(final Deserializer<T> inner) {
     @Override
     public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T> defaultValueDeserializer) {

Review comment:
       Could we pass the context instead of both deserializers, and simplify to (relying on the context to throw if necessary):
   ```
   if (inner == null) {
     inner = context.valueSerde();
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
##########
@@ -54,10 +55,14 @@ public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
     public void init(final ProcessorContext context) {
         primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
         foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
-        primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
-        primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
-        foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
-        foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
+        try {
+            primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
+            primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
+            foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
+            foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
+        } catch (final NullPointerException e) {
+            throw new ConfigException("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");

Review comment:
       If we let `context.keySerde` throw, there won't be a NPE and we also don't need to throw `ConfigException` here expliclity, but delegate to `keySerde() / valueSerde()` methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -83,9 +84,15 @@ public void init(final ProcessorContext context) {
             valueSerdeTopic = valueSerdeTopicSupplier.get();
             // get default key serde if it wasn't supplied directly at construction
             if (foreignKeySerializer == null) {
+                if (context.keySerde() == null) {
+                    throw new ConfigException("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");

Review comment:
       as above (same below)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -60,9 +60,12 @@ public void close() {
     }
 
     public void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
-        Objects.requireNonNull(defaultKeySerde);
-        Objects.requireNonNull(defaultValueSerde);
-        serializer.setIfUnset(defaultKeySerde.serializer(), defaultValueSerde.serializer());
-        deserializer.setIfUnset(defaultKeySerde.deserializer(), defaultValueSerde.deserializer());
+        if (defaultKeySerde != null && defaultValueSerde != null) {

Review comment:
       Why is this a single condition? Should we not handle keySerde and valueSerde independently? -- Atm, if one is null, but the other is not null, we would execute the `else` and set _both_ to `null`; is this intentional?
   
   Also, following the comment from above, should we pass in the context into this method?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##########
@@ -40,7 +40,10 @@ public ChangedSerializer(final Serializer<T> inner) {
     @Override
     public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T> defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultValueSerializer);
+            if (defaultValueSerializer == null) {

Review comment:
       as above (to avoid redundant code)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10813:
URL: https://github.com/apache/kafka/pull/10813


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#issuecomment-858849165


   Okay I did some digging -> `defaultKeySerde` and `defaultValueSerde` are only called from the `init` of `AbstractProcessorContext`. I checked all the places that we call `AbstractProcessorContext#keySerde()` and `AbstractProcessorContext#valueSerde()` to make sure we're catching all the potential NPEs and I am fairly confident that we're ok. 
   
   I did some streamlining so now we throw the `ConfigException` right after we access `AbstractProcessorContext#keySerde()` / `valueSerde()` so we aren't passing null's around and there's some tracking b/w throwing errors and calling a certain method. The one place this wasn't possible, was with creating state stores. Right now, we pass around `context.KeySerde()` and `context.valueSerde()` rather than just the `context` in `MeteredKeyValueStore`, `MeteredSessionStore`, and `MeteredWindowStore`. The tricky part with moving to passing around context is that we need to accept two types of context, a `ProcessorContext` and a `StateStoreContext`. I'm open to either leaving these calls as less streamlined than everything else, or duplicating code in `WrappingNullableUtils` to accept both types of context. Thoughts @mjsax @ableegoldman ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661896406



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,70 +32,83 @@
 public class WrappingNullableUtils {
 
     @SuppressWarnings("unchecked")
-    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final boolean isKey) {
-        Deserializer<T> deserializerToUse = specificDeserializer;
-        if (deserializerToUse == null) {
+    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) {
+        final Deserializer<T> deserializerToUse;
+
+        if (specificDeserializer == null) {
+            final Deserializer<?> contextKeyDeserializer = context.keySerde().deserializer();
+            final Deserializer<?> contextValueDeserializer = context.valueSerde().deserializer();
             deserializerToUse = (Deserializer<T>) (isKey ? contextKeyDeserializer : contextValueDeserializer);
         } else {
-            initNullableDeserializer(deserializerToUse, contextKeyDeserializer, contextValueDeserializer);
+            deserializerToUse = specificDeserializer;
+            initNullableDeserializer(deserializerToUse, new SerdeGetter(context));
         }
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final ProcessorContext context, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
+            final Serializer<?> contextKeySerializer = context.keySerde().serializer();
+            final Serializer<?> contextValueSerializer = context.valueSerde().serializer();
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
         } else {
-            initNullableSerializer(serializerToUse, contextKeySerializer, contextValueSerializer);
+            serializerToUse = specificSerializer;
+            initNullableSerializer(serializerToUse, new SerdeGetter(context));
         }
         return serializerToUse;
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
-        Serde<T> serdeToUse = specificSerde;
+    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final SerdeGetter getter, final boolean isKey) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
+            serdeToUse = (Serde<T>) (isKey ?  getter.keySerde() : getter.valueSerde());
+        } else {
+            serdeToUse = specificSerde;
+        }
         if (serdeToUse == null) {

Review comment:
       I think this condition cannot be true any longer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r645306551



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##########
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
         this.context = context;
         final Serializer<?> contextKeySerializer = ProcessorContextUtils.getKeySerializer(context);
         final Serializer<?> contextValueSerializer = ProcessorContextUtils.getValueSerializer(context);
-        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer);
-        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer);
+        keySerializer = prepareKeySerializer(keySerializer, contextKeySerializer, contextValueSerializer, this.name());
+        valSerializer = prepareValueSerializer(valSerializer, contextKeySerializer, contextValueSerializer, this.name());

Review comment:
       It seems better to not pass a potential `null` `contextKeySerializer` or `contextValueSerializer`, but instead forward the `context` directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661898917



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SerdeGetter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.StateStoreContext;
+
+/**
+ * Allows serde access across different context types.
+ */
+public class SerdeGetter {
+
+    private final org.apache.kafka.streams.processor.ProcessorContext pc;

Review comment:
       nit:
   `pc` -> `oldProcessorContext`
   `apiPc` -> `newProessorContext`
   `ssc` -> `stateStoreContext`
   
   below `c` -> `context` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r647929736



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       My thought was, that for a `ConfigException` we are doomed to fail anyway, and thus it seems not make sense to call the handler to allow the user to "swallow" the exception by returning `CONTINUE` ?
   
   Also, even if the user does return `CONTINUE`, it seems we could ignore it and rethrow the `ConfigException` and die anyway, what seems to defeat the purpose of (calling) the exception handler to begin with?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661680528



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1426,6 +1425,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
     public Serde defaultKeySerde() {
         final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         try {
+            if (keySerdeConfigSetting ==  null) {
+                return null;

Review comment:
       Well, I think to argue that we might want to change the `AbstractProcessorContext`? Instead of making the call in the constructor, we update `keySerde()` and `valueSerde()` method to make the call:
   ```
   public Serde<?> keySerde() {
       if (keySerde == null) {
           keySerde = config.defaultValueSerde();
       }
       return keySerde;
   }
   ```
   
   Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661795305



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,70 +32,83 @@
 public class WrappingNullableUtils {
 
     @SuppressWarnings("unchecked")
-    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final Deserializer<?> contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final boolean isKey) {
-        Deserializer<T> deserializerToUse = specificDeserializer;
-        if (deserializerToUse == null) {
+    private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) {
+        final Deserializer<T> deserializerToUse;
+
+        if (specificDeserializer == null) {
+            final Deserializer<?> contextKeyDeserializer = context.keySerde().deserializer();
+            final Deserializer<?> contextValueDeserializer = context.valueSerde().deserializer();
             deserializerToUse = (Deserializer<T>) (isKey ? contextKeyDeserializer : contextValueDeserializer);
         } else {
-            initNullableDeserializer(deserializerToUse, contextKeyDeserializer, contextValueDeserializer);
+            deserializerToUse = specificDeserializer;
         }
+        initNullableDeserializer(deserializerToUse, context);
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final Serializer<?> contextKeySerializer, final Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final ProcessorContext context, final boolean isKey, final String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
+            final Serializer<?> contextKeySerializer = context.keySerde().serializer();
+            final Serializer<?> contextValueSerializer = context.valueSerde().serializer();
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : contextValueSerializer);
         } else {
-            initNullableSerializer(serializerToUse, contextKeySerializer, contextValueSerializer);
+            serializerToUse = specificSerializer;
         }
+        initNullableSerializer(serializerToUse, context);
         return serializerToUse;
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean isKey) {
-        Serde<T> serdeToUse = specificSerde;
+    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final SerdeGetter getter, final boolean isKey) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
+            serdeToUse = (Serde<T>) (isKey ?  getter.keySerde() : getter.valueSerde());
+        } else {
+            serdeToUse = specificSerde;
+        }
         if (serdeToUse == null) {
-            serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : contextValueSerde);
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or set one through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) + "_SERDE_CLASS_CONFIG");
         } else if (serdeToUse instanceof WrappingNullableSerde) {
-            ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde, contextValueSerde);
+            ((WrappingNullableSerde) serdeToUse).setIfUnset(getter);

Review comment:
       This is causing issues - even though `contextKeySerde` was `Serde<?>`, some of the underlying `setIfUnset` methods are complaining when I access the serde straight from the `getter`.
   
   In `StreamStreamJoinIntegrationTest#testOuterRepartitioned` I get a cast exception:
   `class org.apache.kafka.common.serialization.LongDeserializer cannot be cast to class org.apache.kafka.common.serialization.Serializer (org.apache.kafka.common.serialization.LongDeserializer and org.apache.kafka.common.serialization.Serializer are in unnamed module of loader 'app')
   `
   
   It seems like because we aren't passing in the serde directly the generics aren't resolving right...... This issue is only popping up in the `StreamStreamJoinIntegrationTest` so maybe it's something with the test set-up? But I'm kinda at a loss about how to get it to stop complaining ☚ī¸




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661680517



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -60,9 +60,12 @@ public void close() {
     }
 
     public void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
-        Objects.requireNonNull(defaultKeySerde);
-        Objects.requireNonNull(defaultValueSerde);
-        serializer.setIfUnset(defaultKeySerde.serializer(), defaultValueSerde.serializer());
-        deserializer.setIfUnset(defaultKeySerde.deserializer(), defaultValueSerde.deserializer());
+        if (defaultKeySerde != null && defaultValueSerde != null) {

Review comment:
       The context issue is same as above - we need to be able to handle two processor types. The reason I left these lumped together is because in the call to `setIfUnset` we pass in the `serializer`/`deserializer`, so if we let there be a null in either we'd get a NPE at the method call. I tried passing in just the `default` for both but ran into issues wtih the generics. I can refactor to allow one to be null




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661897674



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
##########
@@ -35,17 +36,17 @@ public LeftOrRightValueDeserializer(final Deserializer<V1> leftDeserializer, fin
 
     @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<Object> defaultValueDeserializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (leftDeserializer == null) {
-            leftDeserializer = (Deserializer<V1>) Objects.requireNonNull(defaultValueDeserializer, "defaultValueDeserializer cannot be null");
+            leftDeserializer = (Deserializer<V1>) Objects.requireNonNull(getter.valueSerde().deserializer(), "defaultValueDeserializer cannot be null");

Review comment:
       as above. `requireNotNull` not necessary any longer

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
##########
@@ -35,17 +36,17 @@ public LeftOrRightValueDeserializer(final Deserializer<V1> leftDeserializer, fin
 
     @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<Object> defaultValueDeserializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (leftDeserializer == null) {
-            leftDeserializer = (Deserializer<V1>) Objects.requireNonNull(defaultValueDeserializer, "defaultValueDeserializer cannot be null");
+            leftDeserializer = (Deserializer<V1>) Objects.requireNonNull(getter.valueSerde().deserializer(), "defaultValueDeserializer cannot be null");
         }
 
         if (rightDeserializer == null) {
-            rightDeserializer = (Deserializer<V2>) Objects.requireNonNull(defaultValueDeserializer, "defaultValueDeserializer cannot be null");
+            rightDeserializer = (Deserializer<V2>) Objects.requireNonNull(getter.valueSerde().deserializer(), "defaultValueDeserializer cannot be null");

Review comment:
       as above. `requireNotNull` not necessary any longer

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
##########
@@ -40,17 +41,17 @@ public LeftOrRightValueSerializer(final Serializer<V1> leftSerializer, final Ser
 
     @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<Object> defaultValueSerializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (leftSerializer == null) {
-            leftSerializer = (Serializer<V1>) Objects.requireNonNull(defaultValueSerializer, "defaultValueSerializer cannot be null");
+            leftSerializer = (Serializer<V1>) Objects.requireNonNull(getter.valueSerde().serializer(), "defaultValueSerializer cannot be null");

Review comment:
       as above

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java
##########
@@ -40,17 +41,17 @@ public LeftOrRightValueSerializer(final Serializer<V1> leftSerializer, final Ser
 
     @SuppressWarnings("unchecked")
     @Override
-    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<Object> defaultValueSerializer) {
+    public void setIfUnset(final SerdeGetter getter) {
         if (leftSerializer == null) {
-            leftSerializer = (Serializer<V1>) Objects.requireNonNull(defaultValueSerializer, "defaultValueSerializer cannot be null");
+            leftSerializer = (Serializer<V1>) Objects.requireNonNull(getter.valueSerde().serializer(), "defaultValueSerializer cannot be null");
         }
 
         if (rightSerializer == null) {
-            rightSerializer = (Serializer<V2>) Objects.requireNonNull(defaultValueSerializer, "defaultValueSerializer cannot be null");
+            rightSerializer = (Serializer<V2>) Objects.requireNonNull(getter.valueSerde().serializer(), "defaultValueSerializer cannot be null");

Review comment:
       as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661898220



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
##########
@@ -72,7 +72,7 @@ public int hashCode() {
         }
     }
 
-    void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde);
+    void setSerdesIfNull(final ProcessorContext context);

Review comment:
       Should this be `SergeGetter` instead of `ProcessorContext` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r646593361



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##########
@@ -78,6 +79,10 @@
                 throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
             }
 
+            if (deserializationException instanceof ConfigException) {

Review comment:
       I think it could be, but I'm not sure I understand why that might be preferable. I guess we wouldn't want the config exception overridden by a user exception? In that case,  yeah it would make sense to move it up. When I saw this in testing the first check for user error didn't interfere with seeing the config error, but I can see how that would change when running a full app




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org