You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/17 23:59:37 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

vcrfxia opened a new pull request, #13274:
URL: https://github.com/apache/kafka/pull/13274

   (This PR is stacked on https://github.com/apache/kafka/pull/13251, https://github.com/apache/kafka/pull/13252, and https://github.com/apache/kafka/pull/13264. The first three commits on this PR do not need to be reviewed separately as a result.)
   
   This PR introduces `VersionedKeyValueStoreBuilder` for building the new versioned stores introduced in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores), analogous to the existing `TimestampedKeyValueStoreBuilder` for building timestamped stores. This PR also updates the existing KTable store materializer class to materialize versioned stores in addition to timestamped stores. As part of this change, the materializer is renamed from `TimestampedKeyValueStoreMaterializer` to simply `KeyValueStoreMaterializer`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110457053


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java:
##########
@@ -68,6 +70,8 @@ public void close() {
     static StateStore getReadOnlyStore(final StateStore global) {
         if (global instanceof TimestampedKeyValueStore) {
             return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global);
+        } else if (global instanceof VersionedKeyValueStore) {

Review Comment:
   This addition, and the analogous one in `AbstractReadWriteDecorator`, is necessary so that `ProcessorContextImpl#getStateStore()` can properly return versioned stores.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458455


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KeyValueStoreMaterializerTest {

Review Comment:
   This is renamed from the existing TimestampedKeyValueStoreMaterializerTest.java, but I added enough new tests (and refactored the existing ones) that Github does not identify it as a rename.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13274:
URL: https://github.com/apache/kafka/pull/13274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110456636


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   Regardless of whether the `Materialized` instance passed for a versioned store has caching enabled or not, the resulting store will not have caching enabled as versioned stores don't support caching. This could be confusing to users but I don't think we have a good way around it since `Materialized` instances default to having caching enabled. If `Materialized` defaulted to caching disabled, then we could throw an error if `withCachingEnabled()` is called for `Materialized` instances with versioned suppliers, but given that the default is already to have caching enabled, it seems harsh/inconvenient to require users to explicitly call `withCachingDisabled()` in order to use versioned stores.
   
   I think the most we should do is log a warning. Curious to hear other opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458817


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -17,23 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 
-public class TimestampedKeyValueStoreMaterializer<K, V> {
+/**
+ * Materializes a key-value store as either a {@link TimestampedKeyValueStoreBuilder} or a
+ * {@link VersionedKeyValueStoreBuilder} depending on whether the store is versioned or not.
+ */
+public class KeyValueStoreMaterializer<K, V> {

Review Comment:
   While renaming this class in IntelliJ, I got a warning that a class with this name existed in a previous version of Kafka Streams. I think this is fine since it's an internal class, but wanted to call it out in case there are compatibility concerns I'm not aware of.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1123970487


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   I guess it can't hurt to log at INFO/WARN level about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1124151220


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   OK, I've added an INFO log statement for now, while we wait for additional opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1123965019


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   Don't have a strong opinion. Pinging @guozhangwang and @vvcephei for input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vvcephei (via GitHub)" <gi...@apache.org>.
vvcephei commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1124670079


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   +1 on an INFO log (or no log at all).
   
   Caching is non-functional behavior, so it makes sense for a store that doesn't support it (yet) just to treat it as a request and ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13274: KAFKA-14491: [13/N] Add versioned store builder and materializer

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110458195


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java:
##########
@@ -154,17 +154,34 @@ public void shouldThrowNullPointerIfInnerIsNull() {
     }
 
     @Test
-    public void shouldThrowNullPointerIfKeySerdeIsNull() {
-        assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
+    public void shouldNotThrowNullPointerIfKeySerdeIsNull() {

Review Comment:
   These test changes are not directly related to this PR. While I was in here looking for inspiration for the new VersionedKeyValueStoreBuilderTest.java tests, I noticed that these tests were broken. The only reason they were throwing NullPointerExceptions is because the supplier was not being mocked to return name and metrics scope. Once added, then passing null key or value serdes no longer throws an exception.
   
   I also added the same required mocking into `shouldThrowNullPointerIfTimeIsNull()` below so that the test actually throws an NPE on time being null, and not supplier return null for name or metrics scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org