You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/03/23 18:49:03 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13442: KAFKA-14491: [20/N] Add public-facing methods for versioned stores

vcrfxia opened a new pull request, #13442:
URL: https://github.com/apache/kafka/pull/13442

   (This PR should not be merged until after https://github.com/apache/kafka/pull/13364 and https://github.com/apache/kafka/pull/13431 are merged, so that the feature implementation may be fully complete before exposing the interfaces to users.)
   
   Until this PR, all the code added for [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores) for introducing versioned stores to Kafka Streams has been accessible from internal packages only. This PR exposes the stores via public Stores.java methods, and also updates the TopologyTestDriver.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13442: KAFKA-14491: [20/N] Add public-facing methods for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13442:
URL: https://github.com/apache/kafka/pull/13442#discussion_r1151109249


##########
streams/src/main/java/org/apache/kafka/streams/state/Stores.java:
##########
@@ -110,6 +116,73 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(fina
         return new RocksDbKeyValueBytesStoreSupplier(name, true);
     }
 
+    /**
+     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are available for query
+     *                         (cannot be negative). If a timestamp bound provided to
+     *                         {@link VersionedKeyValueStore#get(Object, long)} is older than this
+     *                         specified history retention, then the get operation will not return data.
+     *                         This parameter also determines the "grace period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @return an instance of {@link VersionedBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code historyRetention} can't be represented as {@code long milliseconds}
+     */
+    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name,
+                                                                               final Duration historyRetention) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String hrMsgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+        final long historyRetentionMs = validateMillisecondDuration(historyRetention, hrMsgPrefix);
+        if (historyRetentionMs < 0L) {
+            throw new IllegalArgumentException("historyRetention cannot be negative");
+        }
+        return new RocksDbVersionedKeyValueBytesStoreSupplier(name, historyRetentionMs);
+    }
+
+    /**
+     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are available for query
+     *                         (cannot be negative). If a timestamp bound provided to
+     *                         {@link VersionedKeyValueStore#get(Object, long)} is older than this
+     *                         specified history retention, then the get operation will not return data.
+     *                         This parameter also determines the "grace period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @param segmentInterval  size of segments for storing old record versions (must be positive). Old record versions
+     *                         for the same key in a single segment are stored (updated and accessed) together.
+     *                         The only impact of this parameter is performance. If segments are large
+     *                         and a workload results in many record versions for the same key being collected
+     *                         in a single segment, performance may degrade as a result. On the other hand,
+     *                         reads and out-of-order writes which access older segments may slow down if

Review Comment:
   Yeah, that's correct. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13442: KAFKA-14491: [20/N] Add public-facing methods for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13442:
URL: https://github.com/apache/kafka/pull/13442#discussion_r1150036447


##########
streams/src/main/java/org/apache/kafka/streams/state/Stores.java:
##########
@@ -110,6 +116,73 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(fina
         return new RocksDbKeyValueBytesStoreSupplier(name, true);
     }
 
+    /**
+     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are available for query
+     *                         (cannot be negative). If a timestamp bound provided to
+     *                         {@link VersionedKeyValueStore#get(Object, long)} is older than this
+     *                         specified history retention, then the get operation will not return data.
+     *                         This parameter also determines the "grace period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @return an instance of {@link VersionedBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code historyRetention} can't be represented as {@code long milliseconds}
+     */
+    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name,
+                                                                               final Duration historyRetention) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String hrMsgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+        final long historyRetentionMs = validateMillisecondDuration(historyRetention, hrMsgPrefix);
+        if (historyRetentionMs < 0L) {
+            throw new IllegalArgumentException("historyRetention cannot be negative");
+        }
+        return new RocksDbVersionedKeyValueBytesStoreSupplier(name, historyRetentionMs);
+    }
+
+    /**
+     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are available for query
+     *                         (cannot be negative). If a timestamp bound provided to
+     *                         {@link VersionedKeyValueStore#get(Object, long)} is older than this
+     *                         specified history retention, then the get operation will not return data.
+     *                         This parameter also determines the "grace period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @param segmentInterval  size of segments for storing old record versions (must be positive). Old record versions
+     *                         for the same key in a single segment are stored (updated and accessed) together.
+     *                         The only impact of this parameter is performance. If segments are large
+     *                         and a workload results in many record versions for the same key being collected
+     *                         in a single segment, performance may degrade as a result. On the other hand,
+     *                         reads and out-of-order writes which access older segments may slow down if

Review Comment:
   `[history] reads` (regular "most recent" reads are not affected, right)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13442: KAFKA-14491: [20/N] Add public-facing methods for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13442:
URL: https://github.com/apache/kafka/pull/13442


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org