You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/25 05:07:59 UTC

[GitHub] [kafka] thake opened a new pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

thake opened a new pull request #9338:
URL: https://github.com/apache/kafka/pull/9338


   Also introduced the notion of WrappingNullableSerdes (aligned to the concept of WrappingNullableSerializer and WrappingNullableDeserializer) and centralized initialization in WrappungNullables.
   
   The added integeration test KTableKTableForeignKeyJoinDistributedTest tests if all serdes are now correctly set on all stream clients.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-706287686


   @thake can you rebase this PR then ping us here so we can review it?
   
   Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r504228659



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -103,6 +104,13 @@ public void init(final StateStoreContext context,
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
+    protected Serde<K> prepareKeySerde(final Serde<K> keySerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {

Review comment:
       This can be `private`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -112,28 +120,26 @@ private void registerMetrics() {
     }
 
     @Deprecated
-    @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {

Review comment:
       This (and the StateStoreContext one) can be `private` now. Thanks for factoring out the smallest chunk that needs to be overridden in the subclass. I wish I'd thought of it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyJoinDistributedTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private static final Properties CONSUMER_CONFIG = new Properties();
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    private static final String INPUT_TOPIC = "input-topic";
+
+    private KafkaStreams client1;
+    private KafkaStreams client2;
+
+    private volatile boolean client1IsOk = false;
+    private volatile boolean client2IsOk = false;
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+    }
+
+    @Before
+    public void setupTopics() throws InterruptedException {
+        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
+        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
+        CLUSTER.createTopic(OUTPUT, 11, 1);
+
+        //Fill test tables
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        final List<KeyValue<String, String>> leftTable = Arrays.asList(
+                new KeyValue<>("lhsValue1", "lhsValue1|rhs1"),
+                new KeyValue<>("lhsValue2", "lhsValue2|rhs2"),
+                new KeyValue<>("lhsValue3", "lhsValue3|rhs3"),
+                new KeyValue<>("lhsValue4", "lhsValue4|rhs4")
+        );
+        final List<KeyValue<String, String>> rightTable = Arrays.asList(
+                new KeyValue<>("rhs1", "rhsValue1"),
+                new KeyValue<>("rhs2", "rhsValue2"),
+                new KeyValue<>("rhs3", "rhsValue3")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTable, producerConfig, CLUSTER.time);
+        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTable, producerConfig, CLUSTER.time);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-distributed-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @After
+    public void after() {
+        client1.close();
+        client2.close();

Review comment:
       It would be good to also clean up the local disk and the topics after the test: `org.apache.kafka.streams.integration.utils.IntegrationTestUtils#quietlyCleanStateAfterTest`
   
   Not always necessary, but it keeps tests from becoming flaky after seemingly unrelated changes.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerde<T, InnerK, InnerV> extends Serde<T> {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    default void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
+        final Serializer<T> serializer = this.serializer();

Review comment:
       This is probably hypocritical, since I've surely written similar code, but this makes me a little uncomfortable, since there's no guarantee that `serializer()` would return the same reference every time it's called. I'd propose to make this an abstract class instead so we can make this guarantee ourselves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713172747


   cherry-picked to 2.7


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r508392317



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -112,28 +120,26 @@ private void registerMetrics() {
     }
 
     @Deprecated
-    @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {

Review comment:
       Done

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class KTableKTableForeignKeyJoinDistributedTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private static final Properties CONSUMER_CONFIG = new Properties();
+
+    @Rule
+    public TestName testName = new TestName();
+
+
+    private static final String INPUT_TOPIC = "input-topic";
+
+    private KafkaStreams client1;
+    private KafkaStreams client2;
+
+    private volatile boolean client1IsOk = false;
+    private volatile boolean client2IsOk = false;
+
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+    }
+
+    @Before
+    public void setupTopics() throws InterruptedException {
+        CLUSTER.createTopic(LEFT_TABLE, 1, 1);
+        CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
+        CLUSTER.createTopic(OUTPUT, 11, 1);
+
+        //Fill test tables
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        final List<KeyValue<String, String>> leftTable = Arrays.asList(
+                new KeyValue<>("lhsValue1", "lhsValue1|rhs1"),
+                new KeyValue<>("lhsValue2", "lhsValue2|rhs2"),
+                new KeyValue<>("lhsValue3", "lhsValue3|rhs3"),
+                new KeyValue<>("lhsValue4", "lhsValue4|rhs4")
+        );
+        final List<KeyValue<String, String>> rightTable = Arrays.asList(
+                new KeyValue<>("rhs1", "rhsValue1"),
+                new KeyValue<>("rhs2", "rhsValue2"),
+                new KeyValue<>("rhs3", "rhsValue3")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_TABLE, leftTable, producerConfig, CLUSTER.time);
+        IntegrationTestUtils.produceKeyValuesSynchronously(RIGHT_TABLE, rightTable, producerConfig, CLUSTER.time);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-distributed-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @After
+    public void after() {
+        client1.close();
+        client2.close();

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-706290399


   Hi @thake , I'm really sorry, I missed your earlier ping. I'll review it.
   
   As Bill said, it will need to be rebased (or you can merge trunk in) to resolve those merge conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713070979


   @vvcephei everything should now be ready for a 2nd review. Your changes have been applied and the comments worked in.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-706352561


   @vvcephei no problem.
   
   I just tried to rebase this on the current master and noticed the introduction of the distinguishment between ProcessorContext and StateStoreContext. As both interfaces do not share a common super-interface, I am forced to duplicate a lot of my code. Would it be possible to create a Super-Interface like "TaskContext":
   ```java
   public interface TaskContext {
       /**
        * Returns the application id.
        *
        * @return the application id
        */
       String applicationId();
   
       /**
        * Returns the task id.
        *
        * @return the task id
        */
       TaskId taskId();
   
       /**
        * Returns the default key serde.
        *
        * @return the key serializer
        */
       Serde<?> keySerde();
   
       /**
        * Returns the default value serde.
        *
        * @return the value serializer
        */
       Serde<?> valueSerde();
   
       /**
        * Returns the state directory for the partition.
        *
        * @return the state directory
        */
       File stateDir();
   
       /**
        * Returns Metrics instance.
        *
        * @return StreamsMetrics
        */
       StreamsMetrics metrics();
   
       /**
        * Returns all the application config properties as key/value pairs.
        *
        * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
        * object and associated to the ProcessorContext.
        *
        * <p> The type of the values is dependent on the {@link org.apache.kafka.common.config.ConfigDef.Type type} of the property
        * (e.g. the value of {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG DEFAULT_KEY_SERDE_CLASS_CONFIG}
        * will be of type {@link Class}, even if it was specified as a String to
        * {@link org.apache.kafka.streams.StreamsConfig#StreamsConfig(Map) StreamsConfig(Map)}).
        *
        * @return all the key/values from the StreamsConfig properties
        */
       Map<String, Object> appConfigs();
   
       /**
        * Returns all the application config properties with the given key prefix, as key/value pairs
        * stripping the prefix.
        *
        * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
        * object and associated to the ProcessorContext.
        *
        * @param prefix the properties prefix
        * @return the key/values matching the given prefix from the StreamsConfig properties.
        */
       Map<String, Object> appConfigsWithPrefix(String prefix);
   }
   
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-707890589


   Thanks for the feedback, @thake . Your perspective makes perfect sense. It would certainly simplify our internal code, and it would also help to organize the API a little more. There are a few reasons I leaned more toward the "flat" API:
   1. Immediate concern: The KIP to get lost in the weeds of designing a whole hierarchy of contexts and deciding where each member should reside. As it is the KIP already took an incredibly long time to converge, and splitting out StateStoreContext dragged us into a new round of proposals about reorganizing the "record context" (which became Record and RecordMetadata). We were down to the wire for 2.7 at that point, and shipping something seemed better than shipping nothing.
   2. Long-term concern: I was afraid I'd be creating a situation in which every single update to any context would kick off a whole new round of bikeshedding about which part of the hierarchy the new thing should belong to, or whether we need a new level in the hierarchy, etc. etc. It seems like having two completely independent kinds of context (for state stores and for processors) would drive people to think only of where something new is _needed_, not where it _belongs_, avoiding the abstract philosophical discussions that programmers are prone to.
   3. Related long-term concern: Coupling: the notion of "context" is similar, but not identical across state stores and processors. It has already happened that we realized that something shared needed to be restricted to just one of the context. If there were any kind of shared interface (like a super-interface or a common interface as a member, like `getTaskContext()`), then we would have more challenges in "un-sharing" a member versus just being able to deprecate an un-shared member where it is inappropriate.
   4. Usability: The idea of organizing information for users has two sides. On one hand, it might be nice to be able to say "ah, this is a task property, so I know to look for it in the "TaskContext." But on the other hand, if you need (for example) the configured serde, and you don't see it in your Context, and you've forgotten that it's filed away under "task context", then you'll have to go hunting for it. Then, once you find it, you'll have to try to remember for the future where it's kept, occupying precious mental space that you could be using for other stuff. There's an art to striking a balance between a "wide" API and a "deep" one: a flat interface with just a few members is trivially easy to use, but a flat interface with too many members is a burden. Once an interface becomes too wide, then some amount of organization is a benefit, but too many levels of organization, or too many internal nodes in the interface tree at all is also a burden. It doesn't feel like we have to
 o many members in either StateStoreContext or ProcessorContext now, so I'm hesitant to organize further at all.
   
   A moderate amount of duplication in the internal code is the price we pay for these trade-offs. If the duplication itself looked too extensive or risky, then that would be another argument to converge the types, but so far, it doesn't seem too bad.
   
   Anyway, time will tell if this was the right tradeoff. We can always shuffle stuff around in the future. Does this all seem to make sense?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-712458128


   Sorry for not giving any timely response. I'm currently on vacation but I will do my best to update the PR tomorrow. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-706587502


   @vvcephei You were right. Duplication in WrappungNullables was my biggest concern. I've refactored it in the way you proposed it. It was not a big deal :) So have no worries!
   
   Just a suggestion regarding the API of SateStoreContext and ProcessorContext: Wouldn't it be possible to extract the properties that belong to the context of the task into a seperate entity (TaskContext) and provide getter in StateStoreContext and ProcessorContext to retrieve the TaskContext? Designing it this way would still decouple StateStoreContext and ProcessorContext but at the same time, it would be easier to write common code that only needs to have information from the TaskContext. This would also transport the semantics of the relationships between Task, Processor and State to the "context" level. I know that this is late feedback on this refactoring since the KIP has already been accepted, but I just wanted to share my thoughs on this before it is a public API.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713173834


   Looks like there are a bunch of merge conflicts for 2.6, probably because trunk/2.7 are post-KIP-478.
   
   I think since 2.7 is in progress right now, we could just stop here. What do you think, @thake ?
   
   If we want it in 2.6, we should do a new PR targeted at that branch, so that we can get Jenkins to run the whole test suite for us.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r504924920



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerde<T, InnerK, InnerV> extends Serde<T> {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    default void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
+        final Serializer<T> serializer = this.serializer();

Review comment:
       Ah, my bad. I didn't get the API right. I propably have worked too much on kotlin code with a lot of immutables :) The abstract class is a good idea to circumvent this problem. I'll check the PR and comment on it directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-712471965


   Thanks, @thake ! Sorry to disturb your vacation.
   
   Please let me know if you want me to finish up this PR for you. If it helps, I could incorporate your responses to my PR, then merge it into your PR, then take care of the other minor remarks I made in review before merging.
   
   I feel like I owe it to you after taking so long to review it. But I also don't want to usurp you, if you prefer to do it yourself.
   
   Thanks,
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-707907130


   The code should now be ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r508392735



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -103,6 +104,13 @@ public void init(final StateStoreContext context,
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
+    protected Serde<K> prepareKeySerde(final Serde<K> keySerde, final Serde<?> contextKeySerde, final Serde<?> contextValueSerde) {

Review comment:
       Removed the method completely and added a static import as you proposed in your PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713278824


   Sounds reasonable. What is the time line for the next release on 2.6?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713167720


   Merged to trunk! I'm cherry-picking to the older branches now...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713095415


   Great! Thanks @thake . I'll take a look now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-708466216


   Oh, sorry, @thake , I didn't see your replies when I submitted my review last night.
   
   Thanks both for your initial feedback and your reply. I'd certainly never pass up the opportunity to hear such concerns or ideas, especially before a release, and I really appreciate that you took the time to raise them.
   
   Also, thanks for your contribution in this PR!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-705090582


   @vvcephei This PR should fix the NPE mentioned in KAFKA-10515. Somehow there are some failing tests that are totally unrelated. Can someone review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-712406549


   Hello @thake ,
   
   I don't mean to bother you, but just wanted to check if you're able to follow up on the last round of review. We have until Wednesday to make the 2.7.0 code freeze.
   
   Thanks again,
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713634414


   Thanks. I haven't seen any messages since Mickael volunteered on 1 Oct, so I guess there's still time to sneak it in :)
   
   I'll take a look at #9467 now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713173834


   Looks like there are a bunch of merge conflicts for 2.6, probably because trunk/2.7 are post-KIP-478.
   
   I think since 2.7 is in progress right now, we could just stop here. If you want to have this in 2.6 and before, can you send a new PR for it, @thake ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-699857289


   The remaining failing tests do not seem to be related to this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-706474125


   Thanks, @thake .
   
   Ah, sorry about that: it was my change, and now I feel extra guilty for not reviewing and merging your fix in a timely fashion.
   
   It was intentional not to link those two interfaces with a supertype. I assume that the duplication you're concerned about is in WrappingNullables? I wonder if you can avoid passing in the context to those utility methods and only pass in the `contextKeySerializer`, etc. You could simply getting these at the call site by adding some more extractors to ProcessorContextUtils.
   
   Since this was my fault to begin with, I'm also happy to take a crack at it next week if it proves to be a pain.
   
   For what it's worth, I quite like your approach in this PR.
   
   Thanks again,
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r504936524



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerde<T, InnerK, InnerV> extends Serde<T> {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    default void setIfUnset(final Serde<InnerK> defaultKeySerde, final Serde<InnerV> defaultValueSerde) {
+        final Serializer<T> serializer = this.serializer();

Review comment:
       No worries, I think this is actually always ok in practice, since I think that all the Wrapping implementations keep their de/serializers in fields anyway. But then again, we've had _so_ many bugs with serdes that I feel a bit twitchy about just ignoring the potential for a future bug that I happened to notice here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713298460


   Created #9467 for branch 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-713165968


   Unrelated test failure: `Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9338:
URL: https://github.com/apache/kafka/pull/9338


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

Posted by GitBox <gi...@apache.org>.
thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-707906748


   Thank you very much for taking so much time to discuss the API change! This really shows a very good attitude towards new contributers and their feedback. 
   
   Your arguments make sense to me and as always every design decision has it's trade offs. With my comment before I just wanted to give you feedback as one of the "early users" of the API. 
   
   Once again, thank you very much for your time and effort!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org