You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/02 00:11:17 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

mjsax commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r887381498


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -83,50 +94,64 @@ private class KStreamSessionWindowAggregateProcessor extends
         ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
         private SessionStore<KIn, VAgg> store;
-        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
+        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
+        private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext;
+
+        private final Time time = Time.SYSTEM;

Review Comment:
   Should we not better pass in a `Time` object, so we can mock it using TTD?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##########
@@ -73,11 +73,10 @@ public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
         internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
         final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
         final String threadId = Thread.currentThread().getName();
+        final String processorName = internalProcessorContext.currentNode().name();
         droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
-        emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
-        emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
+        emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics);
+        emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics);

Review Comment:
   Thanks for all the cleanup -- it's somewhat distracting from the actual changes.
   
   Can we (in the future) extract refactorings/cleanups into individual PRs to simplify reviewing?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*
                 tupleForwarder.maybeForward(
                     record.withKey(sessionKey)
                         .withValue(new Change<>(agg, null)));
+                 */
             }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg,
+                                        final long oldTimestamp) {
+            if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if possible
+            final long newTimestamp = windowedkey.key() != null ? windowedkey.window().end() : oldTimestamp;

Review Comment:
   For what case could `windowedkey.key() == null` ? Is this even possible?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
+                                                                  final Instant latestSessionEndTime) {
+        final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime"));
+        final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   Not sure -- I always need to think very hard to understand (not even sure if I succeed) the fetch logic and how we compute the bounds...
   
   But same question as above: why do we need this new method instead of calling `findSessions(null, null, A, B)` -- I briefly dug into the code and it seems it would do the same thing?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
+                                                                  final Instant latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime"));
+        final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));
+
+        // since subMap is exclusive on toKey, we need to plus one
+        return registerNewIterator(null,
+                                   null,
+                                    Long.MAX_VALUE,
+                                    endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(),
+                                    true);

Review Comment:
   Not sure if I fully understand why we add this new method instead of calling `findSession(null, null, A, B)` ?
   
   The code to create the iterator is different, but I am also not sure why. Is it semantically actually the same? Calling `findSession(null, null, A, B)` would do:
   ```
   registerNewIterator(null, // same
                       null, // same
                       latestSessionStartTime, // why does your code pass Long.MAX_VALUE,
   // but because we use `tailMap` instead of `subMap` below, it seems to do the same thing overall?
                       endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(),
                       true); // same
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org