You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/07 18:03:19 UTC

[GitHub] [kafka] lct45 opened a new pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

lct45 opened a new pull request #9708:
URL: https://github.com/apache/kafka/pull/9708


   Adds `withLoggingEnabled` and `withLoggingDisabled` for `StreamJoined` to give `StreamJoined` the same flexibility as `Materialized`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-746468357


   Test failure was unrelated:
    Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-744649605


   @cadonna for review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9708:
URL: https://github.com/apache/kafka/pull/9708


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r540375321



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
##########
@@ -267,7 +296,49 @@ private StreamJoined(final Serde<K> keySerde,
             thisStoreSupplier,
             otherStoreSupplier,
             name,
-            storeName
+            storeName,
+            loggingEnabled,
+            topicConfig
+        );
+    }
+
+    /**
+     * Configures logging for both state stores. The changelog will be created with the provided configs.
+     * <p>
+     * Note: Any unrecognized configs will be ignored
+     * @param config  configs applied to the changelog topic
+     * @return            a new {@link StreamJoined} configured with logging enabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingEnabled(final Map<String, String> config) {
+
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            true,
+            config
+        );
+    }
+
+    /**
+     * Disable change logging for both state stores.
+     * @return            a new {@link StreamJoined} configured with logging disabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingDisabled() {
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            false,
+            null

Review comment:
       I think you meant this here:
   
   ```suggestion
               new HashMap<>()
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));

Review comment:
       It seems like these would pass by default. Maybe we should check set some log configs and then check that they got propagated ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-742742254


   @vvcephei Thanks for the review! Overall the test coverage looks pretty good, IIUC the log config is never touched when logging is disabled so I don't think there's a NPE that we're missing


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r543222611



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
##########
@@ -36,6 +39,8 @@
     protected final WindowBytesStoreSupplier otherStoreSupplier;
     protected final String name;
     protected final String storeName;
+    protected boolean loggingEnabled;
+    protected Map<String, String> topicConfig;

Review comment:
       I guess both can be declared as `final`, right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);

Review comment:
       ```suggestion
           left.join(
               right,
               (value1, value2) -> value1 + value2,
               joinWindows,
               streamJoined
           );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();

Review comment:
       Suggestion:
   ```suggestion
           final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingDisabled();
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));

Review comment:
       Maybe there is an easier way, but I found the following:
   
   Set the config to:
   
   ```
           final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingEnabled(Collections.singletonMap("test", "property"));
   ```
   and then check it:
   
   ```
           internalTopologyBuilder.buildSubtopology(0);
   
           assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
           assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));
           assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(), equalTo(2));
           for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.values()) {
               assertThat(
                   config.getProperties(Collections.emptyMap(), 0).get("test"),
                   equalTo("property")
               );
           }
   ```
   
   Without
   
   ```
   assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(), equalTo(2));
   ```
   
   the test would pass without checking the config if `buildSubtopology()` is not called because no changelog topics would be registered in the topology. So it basically checks that `buildSubtopology()` is called.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());

Review comment:
       ```suggestion
           final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined
               .with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
               .withStoreName("store")
               .withLoggingEnabled(Collections.emptyMap());
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);

Review comment:
       ```suggestion
           left.join(
               right,
               (value1, value2) -> value1 + value2,
               joinWindows,
               streamJoined
           );
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764957937


   @lct45 -- Seems we missed to update the docs, ie, `streams/upgrade_guide.html` for this KIP. Can you do a follow up PR for it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764957937


   @lct45 -- Seems we missed to update the docs, ie, `streams/upgrade_guide.html` for this KIP. Can you do a follow up PR for it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-768685944


   Thanks @lct45!
   
   For reference: https://github.com/apache/kafka/pull/9951 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r540431368



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));

Review comment:
       Yeah it does pass by default. I experimented with passing the logs in but I wasn't able to find a good way to confirm that the logs I was passing were getting set somewhere. Do you know where they're exposed for me to check? Materialized doesn't seem to check this either




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-745375628


   Thanks for the review and test update @cadonna , and yes the scala should still be good to go


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764993225


   @mjsax Ahh yeah I'll submit a PR, thanks for catching that


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-764993225


   @mjsax Ahh yeah I'll submit a PR, thanks for catching that


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org