You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/27 23:16:40 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N][Emit final] Emit final for session window aggregations

mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r907875412


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,182 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null);
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg);
+                /*
                 tupleForwarder.maybeForward(
                     record.withKey(sessionKey)
                         .withValue(new Change<>(agg, null)));
+                 */
+            }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg) {
+            if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if possible
+            final long newTimestamp = windowedkey.window().end();
+            tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+        }
+
+        // TODO: consolidate SessionWindow with TimeWindow to merge common functions
+        private void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) {
+            if (shouldEmitFinal(windowCloseTime)) {
+                final long emitRangeUpperBound = emitRangeUpperBound(windowCloseTime);
+
+                // if the upper bound is smaller than 0, then there's no window closed ever;
+                // and we can skip range fetching
+                if (emitRangeUpperBound >= 0) {
+                    final long emitRangeLowerBound = emitRangeLowerBound();
+
+                    if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) {
+                        fetchAndEmit(record, windowCloseTime, emitRangeLowerBound, emitRangeUpperBound);
+                    }
+                }
+            }
+        }
+
+        private boolean shouldEmitFinal(final long windowCloseTime) {
+            if (emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return false;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return false;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if system time jumps a lot,
+            // this can be triggered every time
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Only EMIT if the window close time does progress
+            return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime;
+        }
+
+        private long emitRangeLowerBound() {
+            return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? 0L : Math.max(0L, lastEmitWindowCloseTime);

Review Comment:
   nit: can this be simplified to `Math.max(0L, lastEmitWindowCloseTime)`? (Can also be address in follow up PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org