You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/11 02:09:14 UTC
[kafka] branch 2.1 updated: MINOR: default implementation for new
window store overloads (#5759)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 9bc45b9 MINOR: default implementation for new window store overloads (#5759)
9bc45b9 is described below
commit 9bc45b91ee8863c268a6181a9e9fc9f6343c3c7b
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Oct 10 21:08:21 2018 -0500
MINOR: default implementation for new window store overloads (#5759)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Nikolay Izhikov <ni...@apache.org>
---
.../apache/kafka/streams/state/WindowStore.java | 24 ++++++++++++++++++++
.../state/internals/CachingWindowStore.java | 26 ----------------------
.../internals/ChangeLoggingWindowBytesStore.java | 26 ----------------------
.../state/internals/MeteredWindowStore.java | 23 -------------------
.../state/internals/RocksDBWindowStore.java | 23 -------------------
5 files changed, 24 insertions(+), 98 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ad74ae1..50ce386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -17,9 +17,12 @@
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
+import java.time.Instant;
+
/**
* A windowed store interface extending {@link StateStore}.
*
@@ -87,6 +90,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
+ @Override
+ default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
+ ApiUtils.validateMillisecondInstant(from, "from");
+ ApiUtils.validateMillisecondInstant(to, "to");
+ return fetch(key, from.toEpochMilli(), to.toEpochMilli());
+ }
+
/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
* <p>
@@ -102,6 +112,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
+ @Override
+ default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
+ ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
+ ApiUtils.validateMillisecondInstant(toTime, "toTime");
+ return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
+ }
+
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
@@ -112,4 +129,11 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @throws NullPointerException if {@code null} is used for any key
*/
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
+
+ @Override
+ default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
+ ApiUtils.validateMillisecondInstant(from, "from");
+ ApiUtils.validateMillisecondInstant(to, "to");
+ return fetchAll(from.toEpochMilli(), to.toEpochMilli());
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f6b62b2..b55e544 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -16,11 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -206,13 +204,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
}
@Override
- public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) {
// since this function may not access the underlying inner store, we need to validate
// if store is open outside as well.
@@ -241,16 +232,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
);
}
- @Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
- final Bytes to,
- final Instant fromTime,
- final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
private V fetchPrevious(final Bytes key, final long timestamp) {
final byte[] value = underlying.fetch(key, timestamp);
if (value != null) {
@@ -294,11 +275,4 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
cacheFunction
);
}
-
- @Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d4e47c6..9808ca9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -59,28 +57,11 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
}
@Override
- public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
return bytesStore.fetch(keyFrom, keyTo, from, to);
}
@Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
- final Bytes to,
- final Instant fromTime,
- final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return bytesStore.all();
}
@@ -91,13 +72,6 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
}
@Override
- public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index e1b6cd1..5162eac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -150,13 +148,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> all() {
return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
}
@@ -171,13 +162,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
fetchTime,
@@ -187,13 +171,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public void flush() {
final long startNs = time.nanoseconds();
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index e8037bc..d7bb523 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -94,26 +92,12 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
- public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetch(key, from.toEpochMilli(), to.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
- return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
- }
-
- @Override
public KeyValueIterator<Windowed<K>, V> all() {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
@@ -125,13 +109,6 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
- @Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
- return fetchAll(from.toEpochMilli(), to.toEpochMilli());
- }
-
private void maybeUpdateSeqnumForDups() {
if (retainDuplicates) {
seqnum = (seqnum + 1) & 0x7FFFFFFF;