You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/09 16:34:05 UTC

[GitHub] [kafka] jeqo opened a new pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

jeqo opened a new pull request #11315:
URL: https://github.com/apache/kafka/pull/11315


   As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KStream aggregate/reduce operations.
   
   Testing strategy: operations should keep the same tests as new processor should be compatible.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11315:
URL: https://github.com/apache/kafka/pull/11315#issuecomment-931487033


   I ran the tests locally and got a pass: `./gradlew clean :streams:testAll`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11315:
URL: https://github.com/apache/kafka/pull/11315


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11315:
URL: https://github.com/apache/kafka/pull/11315#discussion_r714207540



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
##########
@@ -77,43 +83,51 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // If the key or value is null we don't need to proceed
-            if (key == null || value == null) {
-                LOG.warn(
-                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    key, value, context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null || record.value() == null) {
+                if (context.recordMetadata().isPresent()) {

Review comment:
       Huh, it's weird that the ContextualProcessor exposes both the `context` field and the `context()` getter for the same field. Would you mind making that field private so that all references go through the method?
   
   ```suggestion
                   if (context().recordMetadata().isPresent()) {
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
##########
@@ -77,43 +83,51 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // If the key or value is null we don't need to proceed
-            if (key == null || value == null) {
-                LOG.warn(
-                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    key, value, context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null || record.value() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, and offset not known."

Review comment:
       ```suggestion
                           "Skipping record due to null key or value. Topic, partition, and offset not known."
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
##########
@@ -74,92 +79,118 @@ public void enableSendingOldValues() {
         sendOldValues = true;
     }
 
-    private class KStreamSessionWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+    private class KStreamSessionWindowAggregateProcessor extends
+        ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
-        private SessionStore<K, Agg> store;
-        private SessionTupleForwarder<K, Agg> tupleForwarder;
+        private SessionStore<KIn, VAgg> store;
+        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
             super.init(context);
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             final String threadId = Thread.currentThread().getName();
-            droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(),
+                metrics);
             store = context.getStateStore(storeName);
-            tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
+            tupleForwarder = new SessionTupleForwarder<>(
+                store,
+                context,
+                new SessionCacheFlushListener<>(context),
+                sendOldValues
+            );
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, we do not need proceed aggregating
             // the record with the table
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final long timestamp = context().timestamp();
+            final long timestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
 
-            final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
+            final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
-            Agg agg = initializer.apply();
+            VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<K>, Agg> iterator = store.findSessions(
-                    key,
+                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = store.findSessions(
+                    record.key(),
                     timestamp - windows.inactivityGap(),
                     timestamp + windows.inactivityGap()
                 )
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, Agg> next = iterator.next();
+                    final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
                     merged.add(next);
-                    agg = sessionMerger.apply(key, agg, next.value);
+                    agg = sessionMerger.apply(record.key(), agg, next.value);
                     mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
                 }
             }
 
             if (mergedWindow.end() < closeTime) {
-                LOG.warn(
-                    "Skipping record for expired window. " +
-                        "key=[{}] " +
-                        "topic=[{}] " +
-                        "partition=[{}] " +
-                        "offset=[{}] " +
-                        "timestamp=[{}] " +
-                        "window=[{},{}] " +
-                        "expiration=[{}] " +
-                        "streamTime=[{}]",
-                    key,
-                    context().topic(),
-                    context().partition(),
-                    context().offset(),
-                    timestamp,
-                    mergedWindow.start(),
-                    mergedWindow.end(),
-                    closeTime,
-                    observedStreamTime
-                );
+                final RecordMetadata recordMetadata = context.recordMetadata().get();
+                if (context.recordMetadata().isPresent()) {

Review comment:
       It seems like if this were false, the L152 would have thrown an exception.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
##########
@@ -32,13 +32,13 @@
  * @param <V>
  */
 class SessionTupleForwarder<K, V> {
-    private final ProcessorContext context;
+    private final ProcessorContext<Windowed<K>, Change<V>> context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
     SessionTupleForwarder(final StateStore store,
-                          final ProcessorContext context,
+                          final ProcessorContext<Windowed<K>, Change<V>> context,

Review comment:
       I'm loving the improved clarity here. Thank you!

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -95,52 +101,71 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
-            if (key == null || value == null) {
-                log.warn(
-                    "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
+        public void process(final Record<KIn, VIn> record) {
+            if (record.key() == null || record.value() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    log.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    log.warn(
+                        "Skipping record due to null key. Topic, partition, and offset not known."

Review comment:
       ```suggestion
                           "Skipping record due to null key or value. Topic, partition, and offset not known."
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11315:
URL: https://github.com/apache/kafka/pull/11315#issuecomment-931374193


   Actually, I just looked at the results, and I don't think I'll bother re-triggering the tests. None of those are possibly related:
   
   Test Name | Duration | Age
   -- | -- | --
   Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testCommitTransactionTimeout() | 10 sec | 1
   Build  / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1]  Type=Raft, Name=testPreferredReplicaElection, Security=PLAINTEXT | 1 min 18 sec | 1
   Build / JDK 17 and Scala 2.13 / kafka.controller.ControllerIntegrationTest.testPartitionReassignmentToBrokerWithOfflineLogDir() | 3.5 sec | 1
   Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology | 20 sec | 1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11315: KAFKA-10540: Migrate KStream aggregate operations

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11315:
URL: https://github.com/apache/kafka/pull/11315#discussion_r714236064



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
##########
@@ -74,92 +79,118 @@ public void enableSendingOldValues() {
         sendOldValues = true;
     }
 
-    private class KStreamSessionWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
+    private class KStreamSessionWindowAggregateProcessor extends
+        ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
-        private SessionStore<K, Agg> store;
-        private SessionTupleForwarder<K, Agg> tupleForwarder;
+        private SessionStore<KIn, VAgg> store;
+        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @Override
-        public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
             super.init(context);
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             final String threadId = Thread.currentThread().getName();
-            droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
+            droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(),
+                metrics);
             store = context.getStateStore(storeName);
-            tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
+            tupleForwarder = new SessionTupleForwarder<>(
+                store,
+                context,
+                new SessionCacheFlushListener<>(context),
+                sendOldValues
+            );
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, we do not need proceed aggregating
             // the record with the table
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, and offset not known."
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
-            final long timestamp = context().timestamp();
+            final long timestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - windows.gracePeriodMs() - windows.inactivityGap();
 
-            final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
+            final List<KeyValue<Windowed<KIn>, VAgg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
-            Agg agg = initializer.apply();
+            VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<K>, Agg> iterator = store.findSessions(
-                    key,
+                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = store.findSessions(
+                    record.key(),
                     timestamp - windows.inactivityGap(),
                     timestamp + windows.inactivityGap()
                 )
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, Agg> next = iterator.next();
+                    final KeyValue<Windowed<KIn>, VAgg> next = iterator.next();
                     merged.add(next);
-                    agg = sessionMerger.apply(key, agg, next.value);
+                    agg = sessionMerger.apply(record.key(), agg, next.value);
                     mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
                 }
             }
 
             if (mergedWindow.end() < closeTime) {
-                LOG.warn(
-                    "Skipping record for expired window. " +
-                        "key=[{}] " +
-                        "topic=[{}] " +
-                        "partition=[{}] " +
-                        "offset=[{}] " +
-                        "timestamp=[{}] " +
-                        "window=[{},{}] " +
-                        "expiration=[{}] " +
-                        "streamTime=[{}]",
-                    key,
-                    context().topic(),
-                    context().partition(),
-                    context().offset(),
-                    timestamp,
-                    mergedWindow.start(),
-                    mergedWindow.end(),
-                    closeTime,
-                    observedStreamTime
-                );
+                final RecordMetadata recordMetadata = context.recordMetadata().get();
+                if (context.recordMetadata().isPresent()) {

Review comment:
       woops, great catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org