You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/12 22:48:57 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )

mjsax commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1068740650


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,85 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a read-only {@link StateStore} to the topology.
+     * <p>
+     * A read-only StateStore can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property
+     * will be set to <code>earliest</code> for this topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received
+     * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
+                                                                  final String sourceName,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Deserializer<KIn> keyDeserializer,
+                                                                  final Deserializer<VIn> valueDeserializer,
+                                                                  final String topic,
+                                                                  final String processorName,
+                                                                  final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        // -- disabling logging. We might want to print some logging.
+        storeBuilder.withLoggingDisabled();
+
+        internalTopologyBuilder.addSource(AutoOffsetReset.EARLIEST, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic);
+        internalTopologyBuilder.addProcessor(processorName, stateUpdateSupplier, sourceName);
+        internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+        return this;
+    }
+
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.

Review Comment:
   ```suggestion
        * Adds a read-only {@link StateStore} to the topology.
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -54,7 +54,7 @@
  * The consumer configuration keys
  */
 public class ConsumerConfig extends AbstractConfig {
-    private static final ConfigDef CONFIG;
+    protected static final ConfigDef CONFIG;

Review Comment:
   Why this change? Also what is all this `ReverseXxx` stuff below? Did you add something that does not belong to this PR by accident?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,85 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a read-only {@link StateStore} to the topology.
+     * <p>
+     * A read-only StateStore can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property

Review Comment:
   ```suggestion
        * A read-only {@link StateStore} can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,85 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a read-only {@link StateStore} to the topology.
+     * <p>
+     * A read-only StateStore can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property
+     * will be set to <code>earliest</code> for this topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received
+     * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
+                                                                  final String sourceName,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Deserializer<KIn> keyDeserializer,
+                                                                  final Deserializer<VIn> valueDeserializer,
+                                                                  final String topic,
+                                                                  final String processorName,
+                                                                  final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        // -- disabling logging. We might want to print some logging.
+        storeBuilder.withLoggingDisabled();
+
+        internalTopologyBuilder.addSource(AutoOffsetReset.EARLIEST, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic);
+        internalTopologyBuilder.addProcessor(processorName, stateUpdateSupplier, sourceName);
+        internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+        return this;
+    }
+
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *

Review Comment:
   ```suggestion
        * <p>
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,85 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a read-only {@link StateStore} to the topology.
+     * <p>
+     * A read-only StateStore can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property
+     * will be set to <code>earliest</code> for this topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received
+     * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
+                                                                  final String sourceName,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Deserializer<KIn> keyDeserializer,
+                                                                  final Deserializer<VIn> valueDeserializer,
+                                                                  final String topic,
+                                                                  final String processorName,
+                                                                  final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        // -- disabling logging. We might want to print some logging.
+        storeBuilder.withLoggingDisabled();
+
+        internalTopologyBuilder.addSource(AutoOffsetReset.EARLIEST, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic);
+        internalTopologyBuilder.addProcessor(processorName, stateUpdateSupplier, sourceName);
+        internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+        return this;
+    }
+
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property

Review Comment:
   ```suggestion
        * A read-only {@link StateStore} can use any compacted topic as a changelog. The <code>auto.offset.reset</code> property
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org