You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/16 03:56:38 UTC
[kafka] branch trunk updated: KAFKA-7080 and KAFKA-7222: Cleanup
overlapping KIP changes (#5804)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2646781 KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes (#5804)
2646781 is described below
commit 2646781d3265c8269f35d39c199dfddf14d3c2a8
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Oct 15 20:56:30 2018 -0700
KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes (#5804)
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
docs/streams/upgrade-guide.html | 12 +++++
.../org/apache/kafka/streams/kstream/Windows.java | 15 ------
.../org/apache/kafka/streams/state/Stores.java | 57 +++++-----------------
3 files changed, 25 insertions(+), 59 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b26f3c3..c0f28ef 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -134,6 +134,18 @@
see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
</p>
+ <p>
+ We deprecated the notion of segments in window stores as those are intended to be an implementation details.
+ Thus, method <code>Windows#segments()</code> and variable <code>Windows#segments</code> were deprecated.
+ If you implement custom windows, you should update your code accordingly.
+ Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>.
+ If you implement custom window store, you need to update your code accordingly.
+ Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
+ For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>
+ (note: <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and
+ <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319).
+ </p>
+
<h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below).
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 4dfba23..feaee1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -84,21 +84,6 @@ public abstract class Windows<W extends Window> {
}
/**
- * Return the segment interval in milliseconds.
- *
- * @return the segment interval
- * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
- */
- @Deprecated
- public long segmentInterval() {
- // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
- final long minimumSegmentInterval = 60_000L;
- // Scaled to the (possibly overridden) retention period
- return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
- }
-
-
- /**
* Set the number of segments to be used for rolling the window store.
* This function is not exposed to users but can be called by developers that extend this class.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 30e5140..f7a1824 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state;
-import java.time.Duration;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+import java.time.Duration;
import java.util.Objects;
/**
@@ -155,7 +155,7 @@ public class Stores {
* careful to set it the same as the windowed keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
- * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
+ * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -188,28 +188,6 @@ public class Stores {
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
- * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
- */
- @Deprecated
- public static WindowBytesStoreSupplier persistentWindowStore(final String name,
- final long retentionPeriod,
- final long windowSize,
- final boolean retainDuplicates) {
- // we're arbitrarily defaulting to segments no smaller than one minute.
- final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
- return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
- }
-
- /**
- * Create a persistent {@link WindowBytesStoreSupplier}.
- * @param name name of the store (cannot be {@code null})
- * @param retentionPeriod length of time to retain data in the store (cannot be negative)
- * Note that the retention period must be at least long enough to contain the
- * windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
- * @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates.
- * @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -220,28 +198,16 @@ public class Stores {
ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
- return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates);
+ final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
+
+ return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
}
- /**
- * Create a persistent {@link WindowBytesStoreSupplier}.
- * @param name name of the store (cannot be {@code null})
- * @param retentionPeriod length of time to retain data in the store (cannot be negative)
- * Note that the retention period must be at least long enough to contain the
- * windowed data's entire life cycle, from window-start through window-end,
- * and for the entire grace period.
- * @param segmentInterval size of segments in ms (cannot be negative)
- * @param windowSize size of the windows (cannot be negative)
- * @param retainDuplicates whether or not to retain duplicates.
- * @return an instance of {@link WindowBytesStoreSupplier}
- * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
- */
- @Deprecated
- public static WindowBytesStoreSupplier persistentWindowStore(final String name,
- final long retentionPeriod,
- final long windowSize,
- final boolean retainDuplicates,
- final long segmentInterval) {
+ private static WindowBytesStoreSupplier persistentWindowStore(final String name,
+ final long retentionPeriod,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
@@ -269,7 +235,9 @@ public class Stores {
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
+ * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
+ @Deprecated
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriod) {
Objects.requireNonNull(name, "name cannot be null");
@@ -288,6 +256,7 @@ public class Stores {
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
+ @SuppressWarnings("deprecation")
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");