You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/04/13 05:58:02 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

vcrfxia opened a new pull request, #13554:
URL: https://github.com/apache/kafka/pull/13554

   This PR updates the return type of `VersionedKeyValueStore#put(...)` from void to boolean, where the boolean represents whether the record that was put is the latest record for the particular key. As part of making this change, VersionedBytesStore introduces its own `put(key, value, timestamp)` method to avoid method signature conflicts with the existing `put(key, value)` method from `KeyValueStore<Bytes, byte[]>` which has void return type. As a result, the previously added `NullableValueAndTimestampSerde` class is no longer needed so it's also been removed in this PR as cleanup.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165998633


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;
+        private StateSerdes<K, V> rawValueSerdes;
 
         MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
                                               final String metricScope,
                                               final Time time,
                                               final Serde<K> keySerde,
-                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
-            super(inner, metricScope, time, keySerde, valueSerde);
+                                              final Serde<V> valueSerde) {
+            super(
+                inner,
+                metricScope,
+                time,
+                keySerde,
+                valueSerde == null
+                    ? null
+                    : new ValueAndTimestampSerde<>(valueSerde)
+            );
             this.inner = inner;
+            this.rawValueSerde = valueSerde;
         }
 
-        @Override
-        public void put(final K key, final ValueAndTimestamp<V> value) {
-            if (value == null) {
-                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+        public boolean put(final K key, final V value, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                final boolean isLatest = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);

Review Comment:
   Ah. Missed this one. Nice. Was really wondering as your PR compiled :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1166012524


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -837,6 +847,7 @@ private <T extends VersionedStoreSegment> void finishPut(
                     segment.put(key, segmentValue.serialize());
                 }
             }
+            return foundTs;

Review Comment:
   Again: should we return `SENTINEL_TIMESTAMP` directly?
   
   We could also skip the `return` here (and at the end of `else` block) and have a final `return foundTs`  instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165926367


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;

Review Comment:
   nit: why `raw` -- we usually use `raw` when we have `byte[]` at hand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165993027


##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##########
@@ -41,6 +42,9 @@
  */
 public interface VersionedKeyValueStore<K, V> extends StateStore {
 
+    long PUT_RETURN_CODE_VALID_TO_UNDEFINED = -1L;
+    long PUT_RETURN_CODE_NOT_PUT = Long.MIN_VALUE;

Review Comment:
   Should both be `final` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165935761


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;

Review Comment:
   `valueSerde` is already defined in the base class, and I couldn't come up with a better name 😆 Suggestions welcome!
   
   Thinking about it some more, the `valueSerde` defined in the base class is of type `Serde<ValueAndTimestamp<V>>` from the perspective of this class, while this one is `Serde<V>`, so maybe `unwrappedValueSerde` could be a better name? Or is that even more confusing hmm...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165996502


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;

Review Comment:
   Maybe `plainValueSerde` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165975843


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;
+        private StateSerdes<K, V> rawValueSerdes;
 
         MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
                                               final String metricScope,
                                               final Time time,
                                               final Serde<K> keySerde,
-                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
-            super(inner, metricScope, time, keySerde, valueSerde);
+                                              final Serde<V> valueSerde) {
+            super(
+                inner,
+                metricScope,
+                time,
+                keySerde,
+                valueSerde == null
+                    ? null
+                    : new ValueAndTimestampSerde<>(valueSerde)
+            );
             this.inner = inner;
+            this.rawValueSerde = valueSerde;
         }
 
-        @Override
-        public void put(final K key, final ValueAndTimestamp<V> value) {
-            if (value == null) {
-                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+        public boolean put(final K key, final V value, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                final boolean isLatest = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);

Review Comment:
   There's another existing implementation which takes `Supplier<T>` and passes the result through: https://github.com/apache/kafka/blob/f252c75bf3f2db93bf062bc7e49d3c79a06e9568/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java#L876
   
   It's the same one that's currently being used in the file to return `ValueAndTimestamp<V>` from `get(key)` in the method below this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1166012524


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -837,6 +847,7 @@ private <T extends VersionedStoreSegment> void finishPut(
                     segment.put(key, segmentValue.serialize());
                 }
             }
+            return foundTs;

Review Comment:
   Again: should we return `SENTINEL_TIMESTAMP` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1166009780


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -641,6 +642,8 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToLatestValueStore(
                     versionedStoreClient.deleteLatestValue(key);
                 }
                 return new PutStatus(true, foundTs);

Review Comment:
   Seems `foundTs` should always be `SENTINEL_TIMESTAMP` here? Should we pass in `SENTINEL_TIMESTAMP` directly to make it more explicit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165957316


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;
+        private StateSerdes<K, V> rawValueSerdes;
 
         MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
                                               final String metricScope,
                                               final Time time,
                                               final Serde<K> keySerde,
-                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
-            super(inner, metricScope, time, keySerde, valueSerde);
+                                              final Serde<V> valueSerde) {
+            super(
+                inner,
+                metricScope,
+                time,
+                keySerde,
+                valueSerde == null
+                    ? null
+                    : new ValueAndTimestampSerde<>(valueSerde)
+            );
             this.inner = inner;
+            this.rawValueSerde = valueSerde;
         }
 
-        @Override
-        public void put(final K key, final ValueAndTimestamp<V> value) {
-            if (value == null) {
-                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+        public boolean put(final K key, final V value, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                final boolean isLatest = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);

Review Comment:
   I am confused. I don't see a change to `StreamsMetricsImpl#maybeMeasureLatency` to return a boolean? Can we even use a `Runable` as we now need a return type?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;
+        private StateSerdes<K, V> rawValueSerdes;
 
         MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
                                               final String metricScope,
                                               final Time time,
                                               final Serde<K> keySerde,
-                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
-            super(inner, metricScope, time, keySerde, valueSerde);
+                                              final Serde<V> valueSerde) {
+            super(
+                inner,
+                metricScope,
+                time,
+                keySerde,
+                valueSerde == null
+                    ? null
+                    : new ValueAndTimestampSerde<>(valueSerde)
+            );
             this.inner = inner;
+            this.rawValueSerde = valueSerde;
         }
 
-        @Override
-        public void put(final K key, final ValueAndTimestamp<V> value) {
-            if (value == null) {
-                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+        public boolean put(final K key, final V value, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                final boolean isLatest = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor);

Review Comment:
   I am confused. I don't see a change to `StreamsMetricsImpl#maybeMeasureLatency` to return a boolean? Can we even use a `Run
   nable` as we now need a return type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1166012353


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -825,7 +835,7 @@ private <T extends VersionedStoreSegment> void finishPut(
                     if (RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) == timestamp) {
                         // next timestamp equal to put() timestamp already represents a tombstone,
                         // so no additional insertion is needed in this case
-                        return;
+                        return foundTs;

Review Comment:
   Same: should we return `SENTINEL_TIMESTAMP` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1166009780


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -641,6 +642,8 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToLatestValueStore(
                     versionedStoreClient.deleteLatestValue(key);
                 }
                 return new PutStatus(true, foundTs);

Review Comment:
   Seems `foundTs` should always be `SENTINEL_TIMESTAMP` here? Should we pass in `SENTINEL_TIMESTAMP` directly to make it more explicit?
   
   For this case, we might also not need to introduce `latestValueStoreTimestamp` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13554:
URL: https://github.com/apache/kafka/pull/13554#discussion_r1165935761


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -84,22 +86,37 @@
         extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
 
         private final VersionedBytesStore inner;
+        private final Serde<V> rawValueSerde;

Review Comment:
   `valueSerde` is already defined in the base class, and I couldn't come up with a better name 😆 Suggestions welcome!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return validTo

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13554:
URL: https://github.com/apache/kafka/pull/13554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org