You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/02 23:29:07 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12204: [9/N WIP][Emit final] Emit final for session window aggregations

guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r888483482


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
+                                                                  final Instant latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime"));
+        final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));
+
+        // since subMap is exclusive on toKey, we need to plus one
+        return registerNewIterator(null,
+                                   null,
+                                    Long.MAX_VALUE,
+                                    endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(),
+                                    true);

Review Comment:
   Logically, the main reason is that for emit-final, we need a range query where the from/to are both `endTime`, i.e. you can see the parameters are `earliestSessionEndTime` and `latestSessionEndTime`.
   
   Whereas for the existing functions, their semantics are based on `earliestSessionEndTime` but `latestSessionStartTime`. And that's also the reason for using `Long.MAX_VALUE` here.
   
   On the physical implementation, the main difference is not in the in-memory session store, but the rocksDB session store. I will reply there separately.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
+                                                                  final Instant latestSessionEndTime) {
+        final long earliestEndTime = ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime"));
+        final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   The main reason is that for emit final I need a range API that based on `endTime` for both ends. And that's also why within its implementation I'd have to use `fetchAll` instead of `fetch` here.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java:
##########
@@ -73,11 +73,10 @@ public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
         internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
         final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
         final String threadId = Thread.currentThread().getName();
+        final String processorName = internalProcessorContext.currentNode().name();
         droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
-        emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
-        emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
+        emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics);
+        emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics);

Review Comment:
   Yes! It's my bad to mingle them together here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org