You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/11 02:09:14 UTC

[kafka] branch 2.1 updated: MINOR: default implementation for new window store overloads (#5759)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9bc45b9  MINOR: default implementation for new window store overloads (#5759)
9bc45b9 is described below

commit 9bc45b91ee8863c268a6181a9e9fc9f6343c3c7b
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Oct 10 21:08:21 2018 -0500

    MINOR: default implementation for new window store overloads (#5759)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Nikolay Izhikov <ni...@apache.org>
---
 .../apache/kafka/streams/state/WindowStore.java    | 24 ++++++++++++++++++++
 .../state/internals/CachingWindowStore.java        | 26 ----------------------
 .../internals/ChangeLoggingWindowBytesStore.java   | 26 ----------------------
 .../state/internals/MeteredWindowStore.java        | 23 -------------------
 .../state/internals/RocksDBWindowStore.java        | 23 -------------------
 5 files changed, 24 insertions(+), 98 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ad74ae1..50ce386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 
+import java.time.Instant;
+
 /**
  * A windowed store interface extending {@link StateStore}.
  *
@@ -87,6 +90,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
+    @Override
+    default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+    }
+
     /**
      * Get all the key-value pairs in the given key range and time range from all the existing windows.
      * <p>
@@ -102,6 +112,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      */
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
 
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
+        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
+        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+    }
+
     /**
      * Gets all the key-value pairs that belong to the windows within in the given time range.
      *
@@ -112,4 +129,11 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @throws NullPointerException if {@code null} is used for any key
      */
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
+
+    @Override
+    default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
+        ApiUtils.validateMillisecondInstant(from, "from");
+        ApiUtils.validateMillisecondInstant(to, "to");
+        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f6b62b2..b55e544 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,11 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -206,13 +204,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     }
 
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) {
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
@@ -241,16 +232,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         );
     }
 
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant fromTime,
-                                                           final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
     private V fetchPrevious(final Bytes key, final long timestamp) {
         final byte[] value = underlying.fetch(key, timestamp);
         if (value != null) {
@@ -294,11 +275,4 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
                 cacheFunction
         );
     }
-
-    @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d4e47c6..9808ca9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -59,28 +57,11 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
         return bytesStore.fetch(keyFrom, keyTo, from, to);
     }
 
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
-                                                           final Bytes to,
-                                                           final Instant fromTime,
-                                                           final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         return bytesStore.all();
     }
@@ -91,13 +72,6 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e1b6cd1..5162eac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -150,13 +148,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
     }
@@ -171,13 +162,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
                                                      fetchTime,
@@ -187,13 +171,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public void flush() {
         final long startNs = time.nanoseconds();
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e8037bc..d7bb523 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.time.Instant;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -94,26 +92,12 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetch(key, from.toEpochMilli(), to.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
-        return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
-    }
-
-    @Override
     public KeyValueIterator<Windowed<K>, V> all() {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
@@ -125,13 +109,6 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
-        return fetchAll(from.toEpochMilli(), to.toEpochMilli());
-    }
-
     private void maybeUpdateSeqnumForDups() {
         if (retainDuplicates) {
             seqnum = (seqnum + 1) & 0x7FFFFFFF;