You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/29 17:23:11 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #10381: KAFKA-12533: Migrating KStream Stateless operators to new Processor API

vvcephei commented on a change in pull request #10381:
URL: https://github.com/apache/kafka/pull/10381#discussion_r603468316



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class AbstractProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+
+    protected ProcessorContext<KOut, VOut> context;
+
+    protected AbstractProcessor() {}
+
+    @Override
+    public void init(final ProcessorContext<KOut, VOut> context) {
+        this.context = context;
+    }
+
+    /**
+     * Close this processor and clean up any resources.
+     * <p>
+     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
+     * </p>
+     */
+    @Override
+    public void close() {
+        // do nothing
+    }
+

Review comment:
       ```suggestion
   ```
   
   This is the same as the default implementation in the interface.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class AbstractProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {

Review comment:
       One minor thing that bugged me about the old PAPI was the AbstractProcessor class. It was widely used in places where it provided no value. I'm worried that we will just perpetuate here.
   
   Since the new API has default implementations for init and close, the only value this new abstract class provides is when the subclass needs the context. In that case, we save them from the boilerplace of saving off the context in a field.
   
   Therefore, I'd like to call this class `ContextualProcessor` instead of `AbstractProcessor`. This should make it a little harder to depend on this class unnecessarily.
   
   ```suggestion
   public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
##########
@@ -32,16 +33,16 @@ public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownSt
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamPeekProcessor();
     }
 
-    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPeekProcessor extends AbstractProcessor<K, V, K, V> {
         @Override
-        public void process(final K key, final V value) {
-            action.apply(key, value);
+        public void process(final Record<K, V> record) {
+            action.apply(record.key(), record.value());
             if (forwardDownStream) {
-                context().forward(key, value);
+                context().forward(record);
             }

Review comment:
       Hmm. What do you think about splitting this up and creating a ForEachProcessor that implements `Processor<K, V, Void, Void>` and therefore cannot forward, then simplifying this class to _always_ forward?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
##########
@@ -30,15 +31,15 @@ public KStreamPrint(final ForeachAction<K, V> action) {
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamPrintProcessor();
     }
 
-    private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPrintProcessor extends AbstractProcessor<K, V, K, V> {

Review comment:
       ```suggestion
       private class KStreamPrintProcessor extends Processor<K, V, K, V> {
   ```
   See my comment on AbstractProcessor. Since we don't need the context, we don't need to depend on any abstract class (since the new Processor has defaults for init and close).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -484,7 +484,7 @@ public String queryableStoreName() {
         Objects.requireNonNull(named, "named can't be null");
 
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME);
-        final ProcessorSupplier<K, Change<V>> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
+        final KStreamMapValues<K, Change<V>, V> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
         final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(

Review comment:
       Just FYI, @jeqo , during my POC, I tried to fix this right away, and it dragged me into migrating the entire DSL at once. I think we should do what you are doing instead: just migrate the individual processors first, and _then_ come back and drop the unsafe casts in a later PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
##########
@@ -18,28 +18,30 @@
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.AbstractProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
 
-    private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper;
+    private final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper;
 
-    KStreamFlatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    KStreamFlatMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KOut, VOut> get() {
         return new KStreamFlatMapProcessor();
     }
 
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
+    private class KStreamFlatMapProcessor extends AbstractProcessor<KIn, VIn, KOut, VOut> {
         @Override
-        public void process(final K key, final V value) {
-            for (final KeyValue<? extends K1, ? extends V1> newPair : mapper.apply(key, value)) {
-                context().forward(newPair.key, newPair.value);
+        public void process(final Record<KIn, VIn> record) {
+            for (final KeyValue<? extends KOut, ? extends VOut> newPair :
+                mapper.apply(record.key(), record.value())) {

Review comment:
       This is a weird line break. It would be better to shorten the line by assigning the result of `mapper.apply` to a variable.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
##########
@@ -17,11 +17,12 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.AbstractProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
+public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, K, V> {

Review comment:
       ```suggestion
   public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, Void, Void> {
   ```
   
   This class never forwards.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org