You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/24 17:41:30 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r880778694


##########
streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java:
##########
@@ -39,6 +39,13 @@
  */
 public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {
 
+    // TODO: javadoc; both ends are inclusive
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final Instant earliestSessionEndTime,

Review Comment:
   This is related to 1) in the description, and the first open question: is this public API worth to add? Note I added it into SessionStore not ReadOnlySessionStore, to not expose via IQv1, also I've only added this function for `Instant` param type as well.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*
                 tupleForwarder.maybeForward(
                     record.withKey(sessionKey)
                         .withValue(new Change<>(agg, null)));
+                 */
             }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg,
+                                        final long oldTimestamp) {
+            if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if possible
+            final long newTimestamp = windowedkey.key() != null ? windowedkey.window().end() : oldTimestamp;
+
+            tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+        }
+
+        // TODO: consolidate SessionWindow with TimeWindow to merge common functions

Review Comment:
   I realize that our SessionWindow and TimeWindow, and even SlidingWindow caused many code duplications (e.g. here) where we can just consolidate into the same class, with boolean flags indicating if the start/end are inclusive or exclusive, with that we can further reduce code duplication. Will file a JIRA for it.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java:
##########
@@ -38,7 +39,7 @@
     @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
                               final ProcessorContext<K, Change<V>> context,
-                              final TimestampedCacheFlushListener<K, V> flushListener,
+                              final CacheFlushListener<K, ?> flushListener,

Review Comment:
   This is 3) in the description: as we can use the base class, we then would not need the duplicated TupleFowarder any more.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,

Review Comment:
   This is a minor fix on the param names: the old ones are simply wrong and misleading.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) {
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*

Review Comment:
   Will remove commented out code when removing WIP, ditto elsewhere.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java:
##########
@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
     /**
      * Gets all the key-value pairs that belong to the windows within in the given time range.
      *
-     * @param from the beginning of the time slot from which to search
-     * @param to   the end of the time slot from which to search
+     * @param from the beginning of the time slot from which to search (inclusive)

Review Comment:
   Minor javadoc improvement to remind developers.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
+                                                                  final Instant latestSessionEndTime) {
+        final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime"));
+        final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   This is the second open question: with the current prefixed (base, i.e. time-first) session key schema, this fetchAll would be effectively searching for `[earliestEnd, INF]` because of this logic: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46
   
   This is because we translate the range query without key inside `AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange` instead of `lower/upperRangeFixedSize): https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242
   
   I cannot remember why we need to do this. @lihaosky @mjsax do you remember why?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org