You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/31 13:11:16 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #10507: KAFKA-8410: Migrating stateful operators to new Processor API

cadonna commented on a change in pull request #10507:
URL: https://github.com/apache/kafka/pull/10507#discussion_r642403697



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
##########
@@ -32,7 +32,7 @@
     private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
 
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext context,
+    public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context,

Review comment:
       Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -48,21 +48,20 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-    @SuppressWarnings("unchecked")
-    <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
-                                final Initializer<VOut> initializer,
-                                final NamedInternal named,
-                                final StoreBuilder<?> storeBuilder,
-                                final Serde<KR> keySerde,
-                                final Serde<VOut> valueSerde,
-                                final String queryableName) {
+    <KOut> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                    final Initializer<VOut> initializer,
+                                    final NamedInternal named,
+                                    final StoreBuilder<?> storeBuilder,
+                                    final Serde<KOut> keySerde,
+                                    final Serde<VOut> valueSerde,
+                                    final String queryableName) {
         processRepartitions(groupPatterns, storeBuilder);
         final Collection<GraphNode> processors = new ArrayList<>();
-        final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
+        final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
+            final KStreamAggregateProcessorSupplier<K, K, ?, ?> parentProcessor =

Review comment:
       Shouldn't this be `KStreamAggregateProcessorSupplier<K, ?, K, ?>`? The positions of the parameters `KOut` and `VIn` on `KStreamAggregateProcessorSupplier` changed with respect to `KStreamAggProcessorSupplier`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##########
@@ -183,7 +182,9 @@
 
         final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(materializedInternal.storeName(),
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator),

Review comment:
       ```suggestion
               new KStreamAggregate<>(
                   materializedInternal.storeName(),
                   aggregateBuilder.countInitializer,
                   aggregateBuilder.countAggregator
               ),
   ```
   or
   ```suggestion
               new KStreamAggregate<>(
                   materializedInternal.storeName(),
                   aggregateBuilder.countInitializer,
                   aggregateBuilder.countAggregator),
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##########
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Objects;
+import java.util.Set;

Review comment:
       See my comment above about import order.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
##########
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;

Review comment:
       In [KAFKA-10787](https://issues.apache.org/jira/browse/KAFKA-10787) we agreed on an import order `kafka`, `org.apache.kafka`, `com`, `net`, `org`, `java`, `javax` and static imports. Additionally, there should be a empty line between import blocks.
   
   Note, PR #10428 introduces check and a formatter for this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -26,29 +26,29 @@
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.ConnectedStoreProvider;
 import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;

Review comment:
       I skimmed over this interface and changes are in line breaks of comments and renaming of type parameters. In the interest of good reviews, I would not do those changes in this PR but rather open a separate PR for this interface. However, I might have missed an important part. @jeqo Could you clarify?
   
   Regarding the comments, we usually add a break after each sentence.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -80,24 +79,23 @@
         return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
     }
 
-    @SuppressWarnings("unchecked")
-    <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
-                                                  final Initializer<VOut> initializer,
-                                                  final NamedInternal named,
-                                                  final StoreBuilder<?> storeBuilder,
-                                                  final Serde<KR> keySerde,
-                                                  final Serde<VOut> valueSerde,
-                                                  final String queryableName,
-                                                  final Windows<W> windows) {
+    <KOut, W extends Window> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                      final Initializer<VOut> initializer,
+                                                      final NamedInternal named,
+                                                      final StoreBuilder<?> storeBuilder,
+                                                      final Serde<KOut> keySerde,
+                                                      final Serde<VOut> valueSerde,
+                                                      final String queryableName,
+                                                      final Windows<W> windows) {
         processRepartitions(groupPatterns, storeBuilder);
 
         final Collection<GraphNode> processors = new ArrayList<>();
-        final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
+        final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final KStreamAggProcessorSupplier<K, K, ?, ?>  parentProcessor =
-                (KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamWindowAggregate<K, K, VOut, W>(
+            final KStreamWindowAggregate<K, K, VOut, W> parentProcessor =

Review comment:
       Shouldn't this be `KStreamWindowAggregate<K, VOut, K, W>`? Here I am not sure if I am missing something since the type parameter positions did not change. Why is the type parameter for `V` in `KStreamWindowAggregate` `K` and not `?`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
##########
@@ -32,7 +32,7 @@
     private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
 
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext context,
+    public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context,

Review comment:
       Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org