You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/10 21:24:02 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11481: KAFKA-13117: migrate TupleForwarder and CacheFlushListener to new Record API

vvcephei commented on a change in pull request #11481:
URL: https://github.com/apache/kafka/pull/11481#discussion_r746983596



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
##########
@@ -46,14 +46,14 @@
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
-    public void maybeForward(final Windowed<K> key,
-                             final V newValue,
-                             final V oldValue) {
+    public void maybeForward(final Record<Windowed<K>, Change<V>> record) {
         if (!cachingEnabled) {
-            context.forward(new Record<>(
-                key,
-                new Change<>(newValue, sendOldValues ? oldValue : null),
-                key.window().end()));
+            context.forward(
+                record.withValue(
+                        new Change<>(
+                            record.value().newValue,
+                            sendOldValues ? record.value().oldValue : null))

Review comment:
       Huh, does it make sense for the forwarder to check sendOldValues? It seems like the pattern for forwarders is to push that check into the Processor. Then again, it may not be worth messing with it in this PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
##########
@@ -96,7 +97,11 @@ public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
         replay(store, context);
 
         new SessionTupleForwarder<>(store, context, null, false)
-            .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue");
+            .maybeForward(
+                new Record<>(
+                    new Windowed<>("key", new SessionWindow(21L, 42L)),
+                    new Change<>("value", "oldValue"),
+                    0L));

Review comment:
       Is this timestamp significant?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -432,10 +432,10 @@ private void createCurrentRecordRightWindow(final long inputRecordTimestamp,
                 rightWinAgg,
                 window.start());
             tupleForwarder.maybeForward(
-                new Windowed<>(key, window),
-                rightWinAgg.value(),
-                null,
-                rightWinAgg.timestamp());
+                new Record<>(
+                    new Windowed<>(key, window),
+                    new Change<>(rightWinAgg.value(), null),
+                    rightWinAgg.timestamp()));

Review comment:
       This is a faithful replacement of the old code, but I'm wondering if you want to also take the opportunity to preserve the record headers, etc, by passing the current Record into this method and using `record.withKey.withValue...` as you do elsewhere in this PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -105,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
                 context.setRecordContext(entry.entry().context());
                 try {
                     flushListener.apply(
-                        entry.key().get(),
-                        rawNewValue,
-                        sendOldValues ? rawOldValue : null,
-                        entry.entry().context().timestamp());
+                        new Record<>(
+                            entry.key().get(),
+                            new Change<>(rawNewValue, sendOldValues ? rawOldValue : null),
+                            entry.entry().context().timestamp()));

Review comment:
       Similar to the earlier question, does it makes sense to go ahead and add headers in now as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -119,7 +119,7 @@ public void init(final ProcessorContext<K, Change<V>> context) {
         public void process(final Record<K, Change<V>> record) {
             if (queryableName != null) {
                 store.put(record.key(), ValueAndTimestamp.make(record.value().newValue, record.timestamp()));
-                tupleForwarder.maybeForward(record.key(), record.value().newValue, sendOldValues ? record.value().oldValue : null);
+                tupleForwarder.maybeForward(record.withValue(new Change<>(record.value().newValue, sendOldValues ? record.value().oldValue : null)));

Review comment:
       Huh. I bet it would be ok to just:
   ```suggestion
                   tupleForwarder.maybeForward(record);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -484,10 +484,10 @@ private void updateWindowAndForward(final Window window,
                     ValueAndTimestamp.make(newAgg, newTimestamp),
                     windowStart);
                 tupleForwarder.maybeForward(
-                    new Windowed<>(key, window),
-                    newAgg,
-                    sendOldValues ? oldAgg : null,
-                    newTimestamp);
+                    new Record<>(
+                        new Windowed<>(key, window),
+                        new Change<>(newAgg, oldAgg),

Review comment:
       Looks like we lost the "send old values" check here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
##########
@@ -66,11 +66,11 @@ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean s
 
         expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
         if (sendOldValues) {
-            context.forward("key1", new Change<>("newValue1",  "oldValue1"));
-            context.forward("key2", new Change<>("newValue2",  "oldValue2"), To.all().withTimestamp(42L));
+            context.forward(new Record<>("key1", new Change<>("newValue1",  "oldValue1"), 0L));

Review comment:
       Same question about these timestamps... For the ones that weren't present before, does it matter what we set?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
##########
@@ -80,9 +80,10 @@ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean s
         new SessionTupleForwarder<>(store, context, null,
             sendOldValued)
             .maybeForward(
-                new Windowed<>("key", new SessionWindow(21L, 42L)),
-                "value",
-                "oldValue");
+                new Record<>(
+                    new Windowed<>("key", new SessionWindow(21L, 42L)),
+                    new Change<>("value", "oldValue"),
+                    42L));

Review comment:
       Is this timestamp significant?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
##########
@@ -37,41 +37,20 @@
         myNode = this.context.currentNode();
     }
 
-    @SuppressWarnings("unchecked")
-    TimestampedCacheFlushListener(final org.apache.kafka.streams.processor.ProcessorContext context) {
-        this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
-        myNode = this.context.currentNode();
-    }
-
-    @Override
-    public void apply(final KOut key,
-                      final ValueAndTimestamp<VOut> newValue,
-                      final ValueAndTimestamp<VOut> oldValue,
-                      final long timestamp) {
-        final ProcessorNode prev = context.currentNode();
-        context.setCurrentNode(myNode);
-        try {
-            context.forward(
-                key,
-                new Change<>(getValueOrNull(newValue), getValueOrNull(oldValue)),
-                To.all().withTimestamp(newValue != null ? newValue.timestamp() : timestamp));
-        } finally {
-            context.setCurrentNode(prev);
-        }
-    }
-
     @Override
     public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) {
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
             context.forward(
-                record.withValue(
-                    new Change<>(
-                        getValueOrNull(record.value().newValue),
-                        getValueOrNull(record.value().oldValue)
-                    )
-                )
+                record
+                    .withValue(
+                        new Change<>(
+                            getValueOrNull(record.value().newValue),
+                            getValueOrNull(record.value().oldValue)))
+                    .withTimestamp(
+                        record.value().newValue != null ? record.value().newValue.timestamp()
+                            : record.timestamp())

Review comment:
       Huh. It looks like we missed this before, eh?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -116,6 +116,7 @@ private void setup(final boolean enableCache) {
                 results.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp()));
             }
         };
+        context.setTime(0L);

Review comment:
       Interesting... Why did this need to be changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org