You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/16 03:56:38 UTC

[kafka] branch trunk updated: KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes (#5804)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2646781  KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes (#5804)
2646781 is described below

commit 2646781d3265c8269f35d39c199dfddf14d3c2a8
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Oct 15 20:56:30 2018 -0700

    KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes (#5804)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 docs/streams/upgrade-guide.html                    | 12 +++++
 .../org/apache/kafka/streams/kstream/Windows.java  | 15 ------
 .../org/apache/kafka/streams/state/Stores.java     | 57 +++++-----------------
 3 files changed, 25 insertions(+), 59 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b26f3c3..c0f28ef 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -134,6 +134,18 @@
         see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
     </p>
 
+    <p>
+        We deprecated the notion of segments in window stores as those are intended to be an implementation details.
+        Thus, method <code>Windows#segments()</code> and variable <code>Windows#segments</code> were deprecated.
+        If you implement custom windows, you should update your code accordingly.
+        Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>.
+        If you implement custom window store, you need to update your code accordingly.
+	Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
+        For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>
+        (note: <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and 
+	<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319).
+    </p>
+
     <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
         In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below).
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 4dfba23..feaee1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -84,21 +84,6 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Return the segment interval in milliseconds.
-     *
-     * @return the segment interval
-     * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
-     */
-    @Deprecated
-    public long segmentInterval() {
-        // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
-        final long minimumSegmentInterval = 60_000L;
-        // Scaled to the (possibly overridden) retention period
-        return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
-    }
-
-
-    /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers that extend this class.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 30e5140..f7a1824 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
+import java.time.Duration;
 import java.util.Objects;
 
 /**
@@ -155,7 +155,7 @@ public class Stores {
      *                              careful to set it the same as the windowed keys you're actually storing.
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
+     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
      */
     @Deprecated
     public static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -188,28 +188,6 @@ public class Stores {
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
-     */
-    @Deprecated
-    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
-                                                                 final long retentionPeriod,
-                                                                 final long windowSize,
-                                                                 final boolean retainDuplicates) {
-        // we're arbitrarily defaulting to segments no smaller than one minute.
-        final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
-        return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
-    }
-
-    /**
-     * Create a persistent {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
-     *                              Note that the retention period must be at least long enough to contain the
-     *                              windowed data's entire life cycle, from window-start through window-end,
-     *                              and for the entire grace period.
-     * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
-     * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
      */
     public static WindowBytesStoreSupplier persistentWindowStore(final String name,
@@ -220,28 +198,16 @@ public class Stores {
         ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
         ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
 
-        return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates);
+        final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
+
+        return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
     }
 
-    /**
-     * Create a persistent {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
-     *                              Note that the retention period must be at least long enough to contain the
-     *                              windowed data's entire life cycle, from window-start through window-end,
-     *                              and for the entire grace period.
-     * @param segmentInterval       size of segments in ms (cannot be negative)
-     * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
-     * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
-     */
-    @Deprecated
-    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
-                                                                 final long retentionPeriod,
-                                                                 final long windowSize,
-                                                                 final boolean retainDuplicates,
-                                                                 final long segmentInterval) {
+    private static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                  final long retentionPeriod,
+                                                                  final long windowSize,
+                                                                  final boolean retainDuplicates,
+                                                                  final long segmentInterval) {
         Objects.requireNonNull(name, "name cannot be null");
         if (retentionPeriod < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
@@ -269,7 +235,9 @@ public class Stores {
      *                          windowed data's entire life cycle, from window-start through window-end,
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
+     * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
      */
+    @Deprecated
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final long retentionPeriod) {
         Objects.requireNonNull(name, "name cannot be null");
@@ -288,6 +256,7 @@ public class Stores {
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
+    @SuppressWarnings("deprecation")
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final Duration retentionPeriod) {
         ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");