You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/21 00:12:30 UTC

[GitHub] [kafka] jeqo opened a new pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

jeqo opened a new pull request #11099:
URL: https://github.com/apache/kafka/pull/11099


   As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KTable:
   
   - mapValues, 
   - passthrough,
   - and source operations.
   
   Testing strategy: operations should keep the same tests as new processor should be compatible. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r678766661



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] offset=[{}]",

Review comment:
       Aha! That's an excellent find. We did make a pass and remove them all at some point, but they seem to have come back. I'm ashamed to admit that I reviewed some of those files and didn't notice it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675034484



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
                 "store-"
             );
 
+        final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () ->
+            new ContextualProcessor<Object, Object, Void, Void>() {
+                @Override
+                public void process(final Record<Object, Object> record) {
+                }

Review comment:
       Looking at the tests, they are more related to the thread and store states more than value checking. Should we add some store updates as part of the processor for _more_ correctness?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674847014



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        record.value(),
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}]. Topic, partition, and offset not known.",
+                        record.value()
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        if (context.recordMetadata().isPresent()) {
+                            final RecordMetadata recordMetadata = context.recordMetadata().get();
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}] topic=[{}] partition=[{}] offset=[{}].",

Review comment:
       also here

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] offset=[{}]",

Review comment:
       Oh, I'm sorry, but it looks like we need one more revision. Useful as it would be at times, we can't log any data (keys, values, or headers) because it might leak sensitive information into the logs.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##########
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
                     .filter(e -> e.getLevel().equals("WARN"))
                     .map(Event::getMessage)
                     .collect(Collectors.toList()),
-                hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]")
+                hasItem("Skipping record due to null key. value=[value] topic=[topic] partition=[0] offset=[0]")

Review comment:
       I probably don't need to point this out, but this will have to change back when you remove the value from the production code.
   
   On another note, I guess we could add a test for the other (new) code path when the metadata is absent. I'll leave it up to you.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -704,7 +696,7 @@ public void validateCopartition() {
     private void validateGlobalStoreArguments(final String sourceName,
                                               final String topic,
                                               final String processorName,
-                                              final ProcessorSupplier<?, ?, Void, Void> stateUpdateSupplier,
+                                              final ProcessorSupplier<?, ?, ?, ?> stateUpdateSupplier,

Review comment:
       This should be able to roll back as well, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                        record.value(),
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}]. Topic, partition, and offset not known.",
+                        record.value()
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        if (context.recordMetadata().isPresent()) {
+                            final RecordMetadata recordMetadata = context.recordMetadata().get();
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}] topic=[{}] partition=[{}] offset=[{}].",
+                                store.name(),
+                                oldValueAndTimestamp.timestamp(), record.timestamp(),
+                                record.value(),
+                                recordMetadata.topic(), recordMetadata.offset(), recordMetadata.partition()
+                            );
+                        } else {
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}]. Topic, partition and offset not known.",

Review comment:
       and here

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
                 "store-"
             );
 
+        final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () ->
+            new ContextualProcessor<Object, Object, Void, Void>() {
+                @Override
+                public void process(final Record<Object, Object> record) {
+                }

Review comment:
       Huh, I'm surprised this works; I would have expected that the processor has to put the record in the store. If the test does still pass this way, it might reveal that the test is actually not evaluating anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675036126



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##########
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
                     .filter(e -> e.getLevel().equals("WARN"))
                     .map(Event::getMessage)
                     .collect(Collectors.toList()),
-                hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]")
+                hasItem("Skipping record due to null key. value=[value] topic=[topic] partition=[0] offset=[0]")

Review comment:
       > On another note, I guess we could add a test for the other (new) code path when the metadata is absent. I'll leave it up to you.
   
   Happy to add more tests. What I'm wondering is how to test scenarios where no context is added 🤔 ? TopologyTestDriver allows to nullify this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674135420



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -558,14 +550,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       Huh, I was surprised to see this change, but I figured out it's because of the below test. That test seems to be abusing the source node processor, so let's roll back this change and fix that test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
##########
@@ -120,19 +123,19 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final Change<V> change) {
-            final V1 newValue = computeValue(key, change.newValue);
-            final V1 oldValue = computeOldValue(key, change);
+        public void process(final Record<KIn, Change<VIn>> record) {
+            final VOut newValue = computeValue(record.key(), record.value().newValue);
+            final VOut oldValue = computeOldValue(record.key(), record.value());
 
             if (queryableName != null) {
-                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
-                tupleForwarder.maybeForward(key, newValue, oldValue);
+                store.put(record.key(), ValueAndTimestamp.make(newValue, record.timestamp()));
+                tupleForwarder.maybeForward(record.key(), newValue, oldValue);

Review comment:
       During review, it struck me that we're just internally going to turn this back into a Record. We can update the internal API to accept the new forwarding type, but we don't need to tack that on to this PR.
   
   I filed https://issues.apache.org/jira/browse/KAFKA-13117 so we don't forget.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(
                     "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                ));
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        context.recordMetadata().ifPresent(recordMetadata ->
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {} at offset {}, partition {}.",

Review comment:
       Also, this is not related to your change, but it occurs to me that we're missing some important information here, which would likely save people a lot of time. Can we add the new and old timestamps to the log messages?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(

Review comment:
       Nice touch! Since we only need the metadata for the context in the log, what do you think about logging `"Skipping record due to null key. Topic, partition, and offset not known."` if the metadata is absent?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(
                     "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                ));
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        context.recordMetadata().ifPresent(recordMetadata ->
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {} at offset {}, partition {}.",

Review comment:
       Similar here. If it's absent, maybe we can say `"Detected out-of-order KTable update for {}. Partition and offset not known."`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -98,7 +98,7 @@ public String newStoreName(final String prefix) {
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            () -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));
+            () -> new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get());

Review comment:
       I was surprised that only this test was driving that builder change, so I went looking for the usages in production code. If you look at GlobalStoreNode, you'll see that it doesn't use KTableSource, but instead just takes a `ProcessorSupplier<KIn, VIn, Void, Void>`.
   
   In other words, this is a case where the new, stricter, API is revealing a problem with our code, not the other way around.
   
   Can we just write a quick ProcessorSupplier implementation here suitable to make the test pass?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674249144



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(

Review comment:
       Agreed. I added some more context to the messages and make them consistent across the class. Let me know it looks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r675002211



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] offset=[{}]",

Review comment:
       No worries. This is actually a good findings, as there are other 16 places where we are logging value and keys.  Maybe let's create a issue to handle this in another PR? 
   
   <img width="955" alt="image" src="https://user-images.githubusercontent.com/6180701/126681148-2bd07547-5f56-4cc8-a9f1-88b4826b983e.png">
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r678766017



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
                 "store-"
             );
 
+        final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () ->
+            new ContextualProcessor<Object, Object, Void, Void>() {
+                @Override
+                public void process(final Record<Object, Object> record) {
+                }

Review comment:
       Thanks for the context. While that might be a good idea, let's keep these PRs more strictly scoped. I'm afraid adding in extra stuff will make it harder for people in the future to understand which changes were part of this transition and which weren't.
   
   Offhand, I'd guess that there's another test somewhere verifying that global stores actually function properly. I think you're right: this test is more about verifying that the thread implementation works as desired.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674249547



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -98,7 +98,7 @@ public String newStoreName(final String prefix) {
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            () -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));
+            () -> new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get());

Review comment:
       Oh! great catch! You're right, I've rolled back the change and test is working. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11099:
URL: https://github.com/apache/kafka/pull/11099


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org