You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/10 17:54:38 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9708: [KAFKA-9126]: KIP-689: Add options to enable/disable logging for `StreamJoined`

vvcephei commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r540375321



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
##########
@@ -267,7 +296,49 @@ private StreamJoined(final Serde<K> keySerde,
             thisStoreSupplier,
             otherStoreSupplier,
             name,
-            storeName
+            storeName,
+            loggingEnabled,
+            topicConfig
+        );
+    }
+
+    /**
+     * Configures logging for both state stores. The changelog will be created with the provided configs.
+     * <p>
+     * Note: Any unrecognized configs will be ignored
+     * @param config  configs applied to the changelog topic
+     * @return            a new {@link StreamJoined} configured with logging enabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingEnabled(final Map<String, String> config) {
+
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            true,
+            config
+        );
+    }
+
+    /**
+     * Disable change logging for both state stores.
+     * @return            a new {@link StreamJoined} configured with logging disabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingDisabled() {
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            false,
+            null

Review comment:
       I think you meant this here:
   
   ```suggestion
               new HashMap<>()
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(false));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
+        assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));

Review comment:
       It seems like these would pass by default. Maybe we should check set some log configs and then check that they got propagated ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org