You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/21 16:22:54 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674135420



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -558,14 +550,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       Huh, I was surprised to see this change, but I figured out it's because of the below test. That test seems to be abusing the source node processor, so let's roll back this change and fix that test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
##########
@@ -120,19 +123,19 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final Change<V> change) {
-            final V1 newValue = computeValue(key, change.newValue);
-            final V1 oldValue = computeOldValue(key, change);
+        public void process(final Record<KIn, Change<VIn>> record) {
+            final VOut newValue = computeValue(record.key(), record.value().newValue);
+            final VOut oldValue = computeOldValue(record.key(), record.value());
 
             if (queryableName != null) {
-                store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
-                tupleForwarder.maybeForward(key, newValue, oldValue);
+                store.put(record.key(), ValueAndTimestamp.make(newValue, record.timestamp()));
+                tupleForwarder.maybeForward(record.key(), newValue, oldValue);

Review comment:
       During review, it struck me that we're just internally going to turn this back into a Record. We can update the internal API to accept the new forwarding type, but we don't need to tack that on to this PR.
   
   I filed https://issues.apache.org/jira/browse/KAFKA-13117 so we don't forget.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(
                     "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                ));
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        context.recordMetadata().ifPresent(recordMetadata ->
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {} at offset {}, partition {}.",

Review comment:
       Also, this is not related to your change, but it occurs to me that we're missing some important information here, which would likely save people a lot of time. Can we add the new and old timestamps to the log messages?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(

Review comment:
       Nice touch! Since we only need the metadata for the context in the log, what do you think about logging `"Skipping record due to null key. Topic, partition, and offset not known."` if the metadata is absent?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
+            if (record.key() == null) {
+                context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(
                     "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), context().offset()
-                );
+                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                ));
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
-                            store.name(), context().offset(), context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) {
+                        context.recordMetadata().ifPresent(recordMetadata ->
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {} at offset {}, partition {}.",

Review comment:
       Similar here. If it's absent, maybe we can say `"Detected out-of-order KTable update for {}. Partition and offset not known."`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -98,7 +98,7 @@ public String newStoreName(final String prefix) {
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            () -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));
+            () -> new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get());

Review comment:
       I was surprised that only this test was driving that builder change, so I went looking for the usages in production code. If you look at GlobalStoreNode, you'll see that it doesn't use KTableSource, but instead just takes a `ProcessorSupplier<KIn, VIn, Void, Void>`.
   
   In other words, this is a case where the new, stricter, API is revealing a problem with our code, not the other way around.
   
   Can we just write a quick ProcessorSupplier implementation here suitable to make the test pass?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org