You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/18 00:08:23 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110456636


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   Regardless of whether the `Materialized` instance passed for a versioned store has caching enabled or not, the resulting store will not have caching enabled as versioned stores don't support caching. This could be confusing to users but I don't think we have a good way around it since `Materialized` instances default to having caching enabled. If `Materialized` defaulted to caching disabled, then we could throw an error if `withCachingEnabled()` is called for `Materialized` instances with versioned suppliers, but given that the default is already to have caching enabled, it seems harsh/inconvenient to require users to explicitly call `withCachingDisabled()` in order to use versioned stores.
   
   I think the most we should do is log a warning. Curious to hear other opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org