You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/04 07:00:09 UTC
[kafka] branch trunk updated: KAFKA-7446: Fix the duration and
instant validation messages. (#5930)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7283711 KAFKA-7446: Fix the duration and instant validation messages. (#5930)
7283711 is described below
commit 7283711c0d99d484b10dd7c460f7df01a319fc2b
Author: Srinivas Reddy <mr...@users.noreply.github.com>
AuthorDate: Tue Dec 4 14:59:54 2018 +0800
KAFKA-7446: Fix the duration and instant validation messages. (#5930)
Changes made as part of this commit.
- Improved error message for better readability at millis validation utility
- Corrected java documentation on `AdvanceInterval` check.
- Added caller specific prefix text to make error message more clear to developers/users.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Jacek Laskowski <ja...@japila.pl>
---
.../org/apache/kafka/streams/KafkaStreams.java | 4 ++-
.../apache/kafka/streams/internals/ApiUtils.java | 32 ++++++++++++++++------
.../apache/kafka/streams/kstream/JoinWindows.java | 13 ++++++---
.../apache/kafka/streams/kstream/Materialized.java | 6 +++-
.../kafka/streams/kstream/SessionWindows.java | 8 ++++--
.../apache/kafka/streams/kstream/TimeWindows.java | 17 ++++++++----
.../kafka/streams/kstream/UnlimitedWindows.java | 5 +++-
.../processor/internals/ProcessorContextImpl.java | 5 +++-
.../org/apache/kafka/streams/state/Stores.java | 11 ++++++--
.../apache/kafka/streams/state/WindowStore.java | 14 ++++++----
.../internals/CompositeReadOnlyWindowStore.java | 14 ++++++----
.../state/internals/ReadOnlyWindowStoreStub.java | 14 ++++++----
12 files changed, 97 insertions(+), 46 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c29b7bc..420c51f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -82,6 +82,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -921,7 +922,8 @@ public class KafkaStreams implements AutoCloseable {
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(timeout, "timeout");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
+ ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
final long timeoutMs = timeout.toMillis();
if (timeoutMs < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index e888d7a..dd3b691 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -18,43 +18,57 @@ package org.apache.kafka.streams.internals;
import java.time.Duration;
import java.time.Instant;
-import java.util.Objects;
+
+import static java.lang.String.format;
public final class ApiUtils {
+
+ private static final String MILLISECOND_VALIDATION_FAIL_MSG_FRMT = "Invalid value for parameter \"%s\" (value was: %s). ";
+
private ApiUtils() {
}
/**
* Validates that milliseconds from {@code duration} can be retrieved.
* @param duration Duration to check.
- * @param name Name of params for an error message.
+ * @param messagePrefix Prefix text for an error message.
* @return Milliseconds from {@code duration}.
*/
- public static long validateMillisecondDuration(final Duration duration, final String name) {
+ public static long validateMillisecondDuration(final Duration duration, final String messagePrefix) {
try {
if (duration == null)
- throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't be null.");
+ throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
return duration.toMillis();
} catch (final ArithmeticException e) {
- throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
+ throw new IllegalArgumentException(messagePrefix + "It can't be converted to milliseconds.", e);
}
}
/**
* Validates that milliseconds from {@code instant} can be retrieved.
* @param instant Instant to check.
- * @param name Name of params for an error message.
+ * @param messagePrefix Prefix text for an error message.
* @return Milliseconds from {@code instant}.
*/
- public static long validateMillisecondInstant(final Instant instant, final String name) {
+ public static long validateMillisecondInstant(final Instant instant, final String messagePrefix) {
try {
if (instant == null)
- throw new IllegalArgumentException("[" + name + "] shouldn't be null.");
+ throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
return instant.toEpochMilli();
} catch (final ArithmeticException e) {
- throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
+ throw new IllegalArgumentException(messagePrefix + "It can't be converted to milliseconds.", e);
}
}
+
+ /**
+ * Generates the prefix message for validateMillisecondXXXXXX() utility
+ * @param value Object to be converted to milliseconds
+ * @param name Object name
+ * @return Error message prefix to use in exception
+ */
+ public static String prepareMillisCheckFailMsgPrefix(final Object value, final String name) {
+ return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 2087009..8256890 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
@@ -128,7 +129,8 @@ public final class JoinWindows extends Windows<Window> {
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
return of(timeDifference.toMillis());
}
@@ -161,7 +163,8 @@ public final class JoinWindows extends Windows<Window> {
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
return before(timeDifference.toMillis());
}
@@ -194,7 +197,8 @@ public final class JoinWindows extends Windows<Window> {
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+ ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
return after(timeDifference.toMillis());
}
@@ -226,7 +230,8 @@ public final class JoinWindows extends Windows<Window> {
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEnd.toMillis() < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index a19412d..a0d6e34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* Used to describe how a {@link StateStore} should be materialized.
* You can either provide a custom {@link StateStore} backend through one of the provided methods accepting a supplier
@@ -247,7 +249,9 @@ public class Materialized<K, V, S extends StateStore> {
* @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds}
*/
public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(retention, "retention");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, "retention");
+ ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+
if (retention.toMillis() < 0) {
throw new IllegalArgumentException("Retention must not be negative.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 526c9d1..bdecd8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import java.time.Duration;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
@@ -108,7 +109,8 @@ public final class SessionWindows {
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows with(final Duration inactivityGap) {
- ApiUtils.validateMillisecondDuration(inactivityGap, "inactivityGap");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+ ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix);
return with(inactivityGap.toMillis());
}
@@ -145,7 +147,9 @@ public final class SessionWindows {
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+
if (afterWindowEnd.toMillis() < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 46485b1..942b54d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
@@ -125,7 +126,8 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(size, "size");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
+ ApiUtils.validateMillisecondDuration(size, msgPrefix);
return of(size.toMillis());
}
@@ -138,14 +140,15 @@ public final class TimeWindows extends Windows<TimeWindow> {
*
* @param advanceMs The advance interval ("hop") in milliseconds of the window, with the requirement that {@code 0 < advanceMs <= sizeMs}.
* @return a new window definition with default maintain duration of 1 day
- * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size
+ * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
* @deprecated Use {@link #advanceBy(Duration)} instead
*/
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
@Deprecated
public TimeWindows advanceBy(final long advanceMs) {
if (advanceMs <= 0 || advanceMs > sizeMs) {
- throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d].", sizeMs));
+ throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
+ "and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
}
return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, segments);
}
@@ -159,11 +162,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
*
* @param advance The advance interval ("hop") of the window, with the requirement that {@code 0 < advance.toMillis() <= sizeMs}.
* @return a new window definition with default maintain duration of 1 day
- * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size
+ * @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
*/
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public TimeWindows advanceBy(final Duration advance) {
- ApiUtils.validateMillisecondDuration(advance, "advance");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
+ ApiUtils.validateMillisecondDuration(advance, msgPrefix);
return advanceBy(advance.toMillis());
}
@@ -196,7 +200,8 @@ public final class TimeWindows extends Windows<TimeWindow> {
*/
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+ ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEnd.toMillis() < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 46d7270..0a45d81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* The unlimited window specifications used for aggregations.
* <p>
@@ -82,7 +84,8 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
* @throws IllegalArgumentException if the start time is negative or can't be represented as {@code long milliseconds}
*/
public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(start, "start");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
+ ApiUtils.validateMillisecondInstant(start, msgPrefix);
return startOn(start.toEpochMilli());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 7c18117..913e34e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -31,6 +31,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
import java.util.List;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
private final StreamTask task;
@@ -164,7 +166,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
- ApiUtils.validateMillisecondDuration(interval, "interval");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
+ ApiUtils.validateMillisecondDuration(interval, msgPrefix);
return schedule(interval.toMillis(), type, callback);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index f7a1824..7991b0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -34,6 +34,8 @@ import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import java.time.Duration;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* Factory for creating state stores in Kafka Streams.
* <p>
@@ -195,8 +197,10 @@ public class Stores {
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
- ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
- ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
+ final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+ ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+ final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+ ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
@@ -259,7 +263,8 @@ public class Stores {
@SuppressWarnings("deprecation")
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
- ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+ ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
return persistentSessionStore(name, retentionPeriod.toMillis());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 57792b6..cf5744b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.StateStore;
import java.time.Instant;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* A windowed store interface extending {@link StateStore}.
*
@@ -93,8 +95,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}
@@ -116,8 +118,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}
@@ -135,8 +137,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
@Override
default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 84b589d..fa37f5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.List;
import java.util.Objects;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
* org.apache.kafka.streams.processor.internals.ProcessorTopology}
@@ -89,8 +91,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}
@@ -113,8 +115,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}
@@ -149,8 +151,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index e2757a9..5f18be9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -37,6 +37,8 @@ import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
/**
* A very simple window store stub for testing purposes.
*/
@@ -78,8 +80,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}
@@ -177,8 +179,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(from, "from");
- ApiUtils.validateMillisecondInstant(to, "to");
+ ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+ ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}
@@ -232,8 +234,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
final K to,
final Instant fromTime,
final Instant toTime) throws IllegalArgumentException {
- ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
- ApiUtils.validateMillisecondInstant(toTime, "toTime");
+ ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
+ ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}