You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/15 15:01:56 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #10542: Kafka 12313: Streamling windowed Deserialiser configs.

vamossagar12 opened a new pull request #10542:
URL: https://github.com/apache/kafka/pull/10542


   This PR aims to streamline the configurations for WindowedDeserialisers. It deprecates default.windowed.key.serde.inner and default.windowed.value.serde.inner configs in StreamConfig and adds window.inner.class.deserialiser. Details described here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10542:
URL: https://github.com/apache/kafka/pull/10542


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-842496479


   Done! 👏  
   
   @vamossagar12 I'll close out the JIRA ticket, but can you update the KIP and move it to the "Adopted" section in both the main KIP page and the Streams KIPs subpage? I can send you a link if you're not sure what I'm referring to here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-842807268


   thanks @ableegoldman , have updated the main KIP page, the streams kiP sub page and the actual kiP. Thanks for your support!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-841965396


   hey @ableegoldman , could you plz review the PR whenever you get the chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-842496479


   Done! 👏  
   
   @vamossagar12 I'll close out the JIRA ticket, but can you update the KIP and move it to the "Adopted" section in both the main KIP page and the Streams KIPs subpage? I can send you a link if you're not sure what I'm referring to here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-841965396


   hey @ableegoldman , could you plz review the PR whenever you get the chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#discussion_r617962722



##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
##########
@@ -177,9 +177,8 @@ public static void main(final String[] args) {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
-        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, JSONSerde.class);
+        props.put(StreamsConfig.WINDOW_INNER_CLASS_DESERIALISER, JSONSerde.class.getName());

Review comment:
       We should be providing an instance of the TimeWindowedSerde to the Streams DSL in the application topology, not using this config. That's the idea of this KIP -- a Kafka Streams application should pass in a `new TimeWindowedSerde(innerClass, windowSize)` rather than using this like a default.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -648,16 +655,11 @@
                     Serdes.ByteArraySerde.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_VALUE_SERDE_CLASS_DOC)
-            .define(DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
-                    Type.CLASS,
-                    null,
-                    Importance.MEDIUM,
-                    DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC)
-            .define(DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS,
+            .define(WINDOW_INNER_CLASS_DESERIALISER,
                     Type.CLASS,
                     null,
                     Importance.MEDIUM,

Review comment:
       This should be `LOW` since the corresponding config for window size of a windowed serde is also `LOW`. The configs are grouped by Importance so you'll need to move it to the section below (and also it should be in alphabetical order).  

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java
##########
@@ -45,16 +38,26 @@ public SessionWindowedDeserializer(final Deserializer<T> inner) {
     @SuppressWarnings("unchecked")
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
-        if (inner == null) {
-            final String propertyName = isKey ? StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS : StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS;
-            final String value = (String) configs.get(propertyName);
+        final String windowInnerClassDeserialiserConfig = (String) configs.get(StreamsConfig.WINDOW_INNER_CLASS_DESERIALISER);
+        Deserializer<T> windowInnerClassDeserialiserThroughConfig = null;
+        if (windowInnerClassDeserialiserConfig != null) {
             try {
-                inner = Serde.class.cast(Utils.newInstance(value, Serde.class)).deserializer();
-                inner.configure(configs, isKey);
+                windowInnerClassDeserialiserThroughConfig = Utils.newInstance(windowInnerClassDeserialiserConfig, Deserializer.class);
             } catch (final ClassNotFoundException e) {
-                throw new ConfigException(propertyName, value, "Serde class " + value + " could not be found.");
+                throw new ConfigException(StreamsConfig.WINDOW_INNER_CLASS_DESERIALISER, windowInnerClassDeserialiserConfig,
+                    "Deserialiser class " + windowInnerClassDeserialiserConfig + " could not be found.");
             }
         }
+
+        if (inner != null && windowInnerClassDeserialiserConfig != null) {
+            if (!inner.getClass().getName().equals(windowInnerClassDeserialiserConfig))
+                throw new IllegalArgumentException("Inner class deserializer config set using time windowed " +

Review comment:
       We should log an error that prints out what the two configs actually are

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -380,16 +380,23 @@
 
     /** {@code default.windowed.key.serde.inner} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
     private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " +
         "<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
     /** {@code default.windowed.value.serde.inner} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
     private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed value. Must implement the " +
         "<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
+    public static final String WINDOW_INNER_CLASS_DESERIALISER = "window.inner.class.deserializer";

Review comment:
       I know this is the name we voted on in the KIP, but I wonder if it should be `WINDOWED` instead of `WINDOW`? I honestly didn't notice it before, but "windowed" seems more consistent with the actual Serde name. 
   If you agree, you can just update the KIP and send a quick note to the KIP thread on the mailing list to clarify the name change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-827215366


   Hey @vamossagar12 , I think we're good to go on the update you proposed to this KIP. Take a look at the other feedback I left and just ping me when the PR is ready for review again. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-823770411


   hey @ableegoldman , plz review whenever you get the chance..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-825558068


   Thanks @ableegoldman , actually i think i had updated the ticket as well that we need to include serialisers as well. i have updated the KIP. and sent it again for discussion. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #10542:
URL: https://github.com/apache/kafka/pull/10542#issuecomment-830052287


   @ableegoldman , i have addressed all the comments.. plz review whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10542: KAFKA-12313: Streamling windowed Deserialiser configs.

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10542:
URL: https://github.com/apache/kafka/pull/10542


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org