You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/12 15:09:20 UTC
[kafka] branch 2.7 updated: KAFKA-10598: Improve IQ name and type
checks (#9408)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 716d938 KAFKA-10598: Improve IQ name and type checks (#9408)
716d938 is described below
commit 716d9389623ac20f350abefe04e91036016b6cb7
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Oct 12 09:34:32 2020 -0500
KAFKA-10598: Improve IQ name and type checks (#9408)
Previously, we would throw a confusing error, "the store has migrated,"
when users ask for a store that is not in the topology at all, or when the
type of the store doesn't match the QueryableStoreType parameter.
Adds an up-front check that the requested store is registered and also
a better error message when the QueryableStoreType parameter
doesn't match the store's type.
Reviewers: Guozhang Wang <gu...@apache.org>
---
.../org/apache/kafka/streams/KafkaStreams.java | 21 +++-
.../org/apache/kafka/streams/StreamsBuilder.java | 8 +-
.../kafka/streams/kstream/KGroupedStream.java | 12 +-
.../kafka/streams/kstream/KGroupedTable.java | 10 +-
.../kstream/SessionWindowedCogroupedKStream.java | 4 +-
.../streams/kstream/SessionWindowedKStream.java | 8 +-
.../apache/kafka/streams/kstream/Suppressed.java | 2 +-
.../kstream/TimeWindowedCogroupedKStream.java | 4 +-
.../kafka/streams/kstream/TimeWindowedKStream.java | 8 +-
.../kstream/internals/MaterializedInternal.java | 8 +-
.../processor/internals/ProcessorTopology.java | 15 ++-
.../internals/StreamThreadStateStoreProvider.java | 61 +++++-----
.../org/apache/kafka/streams/TopologyTest.java | 4 +-
.../integration/JoinStoreIntegrationTest.java | 23 +++-
...reignKeyJoinMaterializationIntegrationTest.java | 16 +--
.../integration/QueryableStateIntegrationTest.java | 128 ++++++++++++++++++---
.../integration/utils/IntegrationTestUtils.java | 30 ++++-
.../internals/InternalTopologyBuilderTest.java | 8 ++
.../StreamThreadStateStoreProviderTest.java | 43 ++++---
19 files changed, 295 insertions(+), 118 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 83b84dc..b5e64e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -153,6 +153,8 @@ public class KafkaStreams implements AutoCloseable {
private final QueryableStoreProvider queryableStoreProvider;
private final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;
+ private final ProcessorTopology taskTopology;
+ private final ProcessorTopology globalTaskTopology;
GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
@@ -702,7 +704,7 @@ public class KafkaStreams implements AutoCloseable {
internalTopologyBuilder.rewriteTopology(config);
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
- final ProcessorTopology taskTopology = internalTopologyBuilder.buildTopology();
+ taskTopology = internalTopologyBuilder.buildTopology();
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
@@ -719,7 +721,7 @@ public class KafkaStreams implements AutoCloseable {
// create the stream thread, global update thread, and cleanup thread
threads = new StreamThread[numStreamThreads];
- final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
final boolean hasGlobalTopology = globalTaskTopology != null;
if (numStreamThreads == 0 && !hasGlobalTopology) {
@@ -1198,10 +1200,21 @@ public class KafkaStreams implements AutoCloseable {
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
- * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
- * {@code queryableStoreType} doesn't exist
+ * @throws InvalidStateStoreException If the specified store name does not exist in the topology
+ * or if the Streams instance isn't in a queryable state.
+ * If the store's type does not match the QueryableStoreType,
+ * the Streams instance is not in a queryable state with respect
+ * to the parameters, or if the store is not available locally, then
+ * an InvalidStateStoreException is thrown upon store access.
*/
public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
+ final String storeName = storeQueryParameters.storeName();
+ if ((taskTopology == null || !taskTopology.hasStore(storeName))
+ && (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
+ throw new InvalidStateStoreException(
+ "Cannot get state store " + storeName + " because no such store is registered in the topology."
+ );
+ }
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeQueryParameters);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 62f7b94..5bd9bf1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -259,7 +259,7 @@ public class StreamsBuilder {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
+ * store name. Note that store name may not be queryable through Interactive Queries.
* An internal changelog topic is created by default. Because the source topic can
* be used for recovery, you can avoid creating the changelog topic by setting
* the {@code "topology.optimization"} to {@code "all"} in the {@link StreamsConfig}.
@@ -281,7 +281,7 @@ public class StreamsBuilder {
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
+ * store name. Note that store name may not be queryable through Interactive Queries.
* An internal changelog topic is created by default. Because the source topic can
* be used for recovery, you can avoid creating the changelog topic by setting
* the {@code "topology.optimization"} to {@code "all"} in the {@link StreamsConfig}.
@@ -342,7 +342,7 @@ public class StreamsBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
+ * store name. Note that store name may not be queryable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -373,7 +373,7 @@ public class StreamsBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
+ * store name. Note that store name may not be queryable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index d805f8b..4ecc1e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -60,7 +60,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -87,7 +87,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -215,7 +215,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -277,7 +277,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -340,7 +340,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -386,7 +386,7 @@ public interface KGroupedStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 79bdef4..e193bd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -146,7 +146,7 @@ public interface KGroupedTable<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -175,7 +175,7 @@ public interface KGroupedTable<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -376,7 +376,7 @@ public interface KGroupedTable<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -611,7 +611,7 @@ public interface KGroupedTable<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -681,7 +681,7 @@ public interface KGroupedTable<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
index 2a2cf34..d4f2094 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java
@@ -85,7 +85,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -130,7 +130,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 13bdd17..0f8ec21 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -73,7 +73,7 @@ public interface SessionWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -103,7 +103,7 @@ public interface SessionWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -241,7 +241,7 @@ public interface SessionWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -290,7 +290,7 @@ public interface SessionWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 060b949..2f96993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -153,7 +153,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
* To accomplish this, the operator will buffer events from the window until the window close (that is,
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
* are required to reject out-of-order events for a window whose grace period is expired, there is an additional
- * guarantee that the final results emitted from this suppression will match any queriable state upstream.
+ * guarantee that the final results emitted from this suppression will match any queryable state upstream.
*
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
* This is required to be a "strict" config, since it would violate the "final results"
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
index d7e261a..f010501 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java
@@ -82,7 +82,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -121,7 +121,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 85f7cf8..fc53f28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -72,7 +72,7 @@ public interface TimeWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -101,7 +101,7 @@ public interface TimeWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -239,7 +239,7 @@ public interface TimeWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
@@ -283,7 +283,7 @@ public interface TimeWindowedKStream<K, V> {
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
* and "-changelog" is a fixed suffix.
- * Note that the internal store name may not be queriable through Interactive Queries.
+ * Note that the internal store name may not be queryable through Interactive Queries.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 3948705..4a3cbb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -26,7 +26,7 @@ import java.util.Map;
public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
- private final boolean queriable;
+ private final boolean queryable;
public MaterializedInternal(final Materialized<K, V, S> materialized) {
this(materialized, null, null);
@@ -39,14 +39,14 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
// if storeName is not provided, the corresponding KTable would never be queryable;
// but we still need to provide an internal name for it in case we materialize.
- queriable = storeName() != null;
- if (!queriable && nameProvider != null) {
+ queryable = storeName() != null;
+ if (!queryable && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix);
}
}
public String queryableStoreName() {
- return queriable ? storeName() : null;
+ return queryable ? storeName() : null;
}
public String storeName() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 75f3688..4ee3c0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,17 +16,18 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.ArrayList;
-import java.util.HashMap;
import org.apache.kafka.streams.processor.StateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.stream.Collectors;
public class ProcessorTopology {
private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class);
@@ -37,6 +38,7 @@ public class ProcessorTopology {
private final Map<String, SinkNode<?, ?, ?, ?>> sinksByTopic;
private final Set<String> terminalNodes;
private final List<StateStore> stateStores;
+ private final Set<String> stateStoreNames;
private final Set<String> repartitionTopics;
// the following contains entries for the entire topology, eg stores that do not belong to this ProcessorTopology
@@ -54,6 +56,7 @@ public class ProcessorTopology {
this.sourceNodesByTopic = new HashMap<>(sourceNodesByTopic);
this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic);
this.stateStores = Collections.unmodifiableList(stateStores);
+ stateStoreNames = stateStores.stream().map(StateStore::name).collect(Collectors.toSet());
this.globalStateStores = Collections.unmodifiableList(globalStateStores);
this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
@@ -103,6 +106,10 @@ public class ProcessorTopology {
return stateStores;
}
+ public boolean hasStore(final String storeName) {
+ return stateStoreNames.contains(storeName);
+ }
+
public List<StateStore> globalStateStores() {
return Collections.unmodifiableList(globalStateStores);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index d5a175d..df541d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -27,12 +27,10 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
public class StreamThreadStateStoreProvider {
@@ -55,17 +53,27 @@ public class StreamThreadStateStoreProvider {
streamThread.allTasks().values() : streamThread.activeTasks();
if (storeQueryParams.partition() != null) {
- return findStreamTask(tasks, storeName, storeQueryParams.partition()).
- map(streamTask ->
- validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
- map(Collections::singletonList).
- orElse(Collections.emptyList());
+ for (final Task task : tasks) {
+ if (task.id().partition == storeQueryParams.partition() &&
+ task.getStore(storeName) != null &&
+ storeName.equals(task.getStore(storeName).name())) {
+ final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id());
+ return Collections.singletonList(typedStore);
+ }
+ }
+ return Collections.emptyList();
} else {
- return tasks.stream().
- map(streamTask ->
- validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
- filter(Objects::nonNull).
- collect(Collectors.toList());
+ final List<T> list = new ArrayList<>();
+ for (final Task task : tasks) {
+ final StateStore store = task.getStore(storeName);
+ if (store == null) {
+ // then this task doesn't have that store
+ } else {
+ final T typedStore = validateAndCastStores(store, queryableStoreType, storeName, task.id());
+ list.add(typedStore);
+ }
+ }
+ return list;
}
} else {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
@@ -75,13 +83,18 @@ public class StreamThreadStateStoreProvider {
}
@SuppressWarnings("unchecked")
- private <T> T validateAndListStores(final StateStore store, final QueryableStoreType<T> queryableStoreType, final String storeName, final TaskId taskId) {
- if (store != null && queryableStoreType.accepts(store)) {
+ private static <T> T validateAndCastStores(final StateStore store,
+ final QueryableStoreType<T> queryableStoreType,
+ final String storeName,
+ final TaskId taskId) {
+ if (store == null) {
+ throw new NullPointerException("Expected store not to be null at this point.");
+ } else if (queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException(
"Cannot get state store " + storeName + " for task " + taskId +
" because the store is not open. " +
- "The state store may have migrated to another instances.");
+ "The state store may have migrated to another instance.");
}
if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
@@ -91,15 +104,11 @@ public class StreamThreadStateStoreProvider {
return (T) store;
}
} else {
- return null;
+ throw new InvalidStateStoreException(
+ "Cannot get state store " + storeName +
+ " because the queryable store type [" + queryableStoreType.getClass() +
+ "] does not accept the actual store type [" + store.getClass() + "]."
+ );
}
}
-
- private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
- return tasks.stream().
- filter(streamTask -> streamTask.id().partition == partition &&
- streamTask.getStore(storeName) != null &&
- storeName.equals(streamTask.getStore(storeName).name())).
- findFirst();
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ef9becf..f1a7749 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -1032,7 +1032,7 @@ public class TopologyTest {
" <-- KSTREAM-SOURCE-0000000001\n" +
// previously, this was
// Processor: KTABLE-MAPVALUES-0000000004 (stores: [KTABLE-MAPVALUES-STATE-STORE-0000000003]
- // but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
+ // but we added a change not to materialize non-queryable stores. This change shouldn't break compatibility.
" Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
@@ -1099,7 +1099,7 @@ public class TopologyTest {
" <-- KSTREAM-SOURCE-0000000001\n" +
// Previously, this was
// Processor: KTABLE-FILTER-0000000004 (stores: [KTABLE-FILTER-STATE-STORE-0000000003]
- // but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
+ // but we added a change not to materialize non-queryable stores. This change shouldn't break compatibility.
" Processor: KTABLE-FILTER-0000000004 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index e15788a..9acc879 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -16,12 +16,9 @@
*/
package org.apache.kafka.streams.integration;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -30,7 +27,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -42,7 +38,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
@@ -85,7 +88,7 @@ public class JoinStoreIntegrationTest {
}
@Test
- public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {
+ public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-no-store-access");
final StreamsBuilder builder = new StreamsBuilder();
@@ -108,7 +111,15 @@ public class JoinStoreIntegrationTest {
kafkaStreams.start();
latch.await();
- assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1));
+ final InvalidStateStoreException exception =
+ assertThrows(
+ InvalidStateStoreException.class,
+ () -> kafkaStreams.store(fromNameAndType("join-store", keyValueStore()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Cannot get state store join-store because no such store is registered in the topology.")
+ );
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 11aa616..23870a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -62,13 +62,13 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
private final boolean materialized;
- private final boolean queriable;
+ private final boolean queryable;
private Properties streamsConfig;
- public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final boolean materialized, final boolean queriable) {
+ public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final boolean materialized, final boolean queryable) {
this.materialized = materialized;
- this.queriable = queriable;
+ this.queryable = queryable;
}
@Rule
@@ -85,7 +85,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
}
- @Parameterized.Parameters(name = "materialized={0}, queriable={1}")
+ @Parameterized.Parameters(name = "materialized={0}, queryable={1}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[] {false, false},
@@ -108,7 +108,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- if (materialized && queriable) {
+ if (materialized && queryable) {
assertThat(
asMap(store),
is(emptyMap())
@@ -119,7 +119,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
// it's not possible to know whether a result was previously emitted.
left.pipeInput("lhs1", (String) null);
{
- if (materialized && queriable) {
+ if (materialized && queryable) {
// in only this specific case, the record cache will actually be activated and
// suppress the unnecessary tombstone. This is because the cache is able to determine
// for sure that there has never been a previous result. (Because the "old" and "new" values
@@ -148,7 +148,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
outputTopic.readKeyValuesToMap(),
is(emptyMap())
);
- if (materialized && queriable) {
+ if (materialized && queryable) {
assertThat(
asMap(store),
is(emptyMap())
@@ -175,7 +175,7 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
final ValueJoiner<String, String, String> joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")";
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized;
- if (queriable) {
+ if (queryable) {
materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableStoreName).withValueSerde(Serdes.String());
} else {
materialized = Materialized.with(null, Serdes.String());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fe6ad61..b2680b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -29,13 +29,13 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreamsTest;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -48,17 +48,19 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -97,16 +99,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.sessionStore;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
@@ -305,7 +317,7 @@ public class QueryableStateIntegrationTest {
final int index = queryMetadata.activeHost().port();
final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams;
final ReadOnlyKeyValueStore<String, Long> store =
- IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore(storeName, streamsWithKey, true, keyValueStore());
if (store == null) {
nullStoreKeys.add(key);
continue;
@@ -430,6 +442,88 @@ public class QueryableStateIntegrationTest {
}
@Test
+ public void shouldRejectNonExistentStoreName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(
+ "input",
+ Materialized
+ .<String, String, KeyValueStore<Bytes, byte[]>>as("input-table")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ );
+
+ final Properties properties = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testName)),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+ ));
+
+ try (final KafkaStreams streams = getStartedStreams(properties, builder, true)) {
+ final ReadOnlyKeyValueStore<String, String> store =
+ streams.store(fromNameAndType("input-table", keyValueStore()));
+ assertThat(store, Matchers.notNullValue());
+
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () -> streams.store(fromNameAndType("no-table", keyValueStore()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is("Cannot get state store no-table because no such store is registered in the topology.")
+ );
+ }
+ }
+
+ @Test
+ public void shouldRejectWronglyTypedStore() throws InterruptedException {
+ final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+ final String input = uniqueTestName + "-input";
+ final String storeName = uniqueTestName + "-input-table";
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(
+ input,
+ Materialized
+ .<String, String, KeyValueStore<Bytes, byte[]>>as(storeName)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
+ );
+
+ CLUSTER.createTopic(input);
+
+ final Properties properties = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, uniqueTestName + "-app"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+ ));
+
+ try (final KafkaStreams streams = getRunningStreams(properties, builder, true)) {
+ final ReadOnlyKeyValueStore<String, String> store =
+ streams.store(fromNameAndType(storeName, keyValueStore()));
+ assertThat(store, Matchers.notNullValue());
+
+ // Note that to check the type we actually need a store reference,
+ // so we can't check when you get the IQ store, only when you
+ // try to use it. Presumably, this could be improved.
+ final ReadOnlySessionStore<String, String> sessionStore =
+ streams.store(fromNameAndType(storeName, sessionStore()));
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () -> sessionStore.fetch("a")
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store " + storeName + " because the queryable store type" +
+ " [class org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType]" +
+ " does not accept the actual store type" +
+ " [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
+ )
+ );
+ } finally {
+ CLUSTER.deleteAllTopicsAndWait(0L);
+ }
+ }
+
+ @Test
public void shouldBeAbleToQueryDuringRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
final List<KafkaStreams> streamsList = new ArrayList<>(numThreads);
@@ -670,10 +764,10 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long> myFilterStore =
- IntegrationTestUtils.getStore("queryFilter", kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore("queryFilter", kafkaStreams, keyValueStore());
final ReadOnlyKeyValueStore<String, Long> myFilterNotStore =
- IntegrationTestUtils.getStore("queryFilterNot", kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore("queryFilterNot", kafkaStreams, keyValueStore());
for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
@@ -737,7 +831,7 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
final ReadOnlyKeyValueStore<String, Long> myMapStore =
- IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore());
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
@@ -786,7 +880,7 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long> myMapStore =
- IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore());
for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
assertEquals(expectedEntry.value, myMapStore.get(expectedEntry.key));
@@ -847,7 +941,7 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long> myCount =
- IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
IntegrationTestUtils.getStore(windowStoreName, kafkaStreams, QueryableStoreTypes.windowStore());
@@ -885,7 +979,7 @@ public class QueryableStateIntegrationTest {
final int maxWaitMs = 30000;
final ReadOnlyKeyValueStore<String, Long> store =
- IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
TestUtils.waitForCondition(
() -> Long.valueOf(8).equals(store.get("hello")),
@@ -903,7 +997,7 @@ public class QueryableStateIntegrationTest {
TestUtils.waitForCondition(
() -> {
try {
- assertEquals(8L, IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()).get("hello"));
+ assertEquals(8L, IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore()).get("hello"));
return true;
} catch (final InvalidStateStoreException ise) {
return false;
@@ -959,7 +1053,7 @@ public class QueryableStateIntegrationTest {
final int maxWaitMs = 30000;
final ReadOnlyKeyValueStore<String, String> store =
- IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
TestUtils.waitForCondition(
() -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
@@ -982,7 +1076,7 @@ public class QueryableStateIntegrationTest {
"wait for thread to fail");
final ReadOnlyKeyValueStore<String, String> store2 =
- IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams, keyValueStore());
try {
TestUtils.waitForCondition(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d8fb5d2..03b287d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,10 +16,6 @@
*/
package org.apache.kafka.streams.integration.utils;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
@@ -79,9 +75,13 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -1187,6 +1187,28 @@ public class IntegrationTestUtils {
return driver;
}
+ public static KafkaStreams getRunningStreams(final Properties streamsConfig,
+ final StreamsBuilder builder,
+ final boolean clean) {
+ final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
+ if (clean) {
+ driver.cleanUp();
+ }
+ final CountDownLatch latch = new CountDownLatch(1);
+ driver.setStateListener((newState, oldState) -> {
+ if (newState == State.RUNNING) {
+ latch.countDown();
+ }
+ });
+ driver.start();
+ try {
+ latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Streams didn't start in time.", e);
+ }
+ return driver;
+ }
+
public static <S> S getStore(final String storeName,
final KafkaStreams streams,
final QueryableStoreType<S> storeType) throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 0bfecc7..7166bf5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -47,6 +47,9 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -1111,6 +1114,11 @@ public class InternalTopologyBuilderTest {
);
builder.initializeSubscription();
+ builder.rewriteTopology(new StreamsConfig(mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "asdf"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf")
+ ))));
+
assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 884244d..8b463b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -76,8 +76,10 @@ import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class StreamThreadStateStoreProviderTest {
@@ -206,9 +208,19 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldNotFindKeyValueStoresAsTimestampedStore() {
mockThread(true);
- final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
- provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
- assertEquals(0, tkvStores.size());
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () -> provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store kv-store because the queryable store type " +
+ "[class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreType] " +
+ "does not accept the actual store type " +
+ "[class org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."
+ )
+ );
}
@Test
@@ -250,9 +262,19 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldNotFindWindowStoresAsTimestampedStore() {
mockThread(true);
- final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
- provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()));
- assertEquals(0, windowStores.size());
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () -> provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store window-store because the queryable store type " +
+ "[class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreType] " +
+ "does not accept the actual store type " +
+ "[class org.apache.kafka.streams.state.internals.MeteredWindowStore]."
+ )
+ );
}
@Test
@@ -359,15 +381,6 @@ public class StreamThreadStateStoreProviderTest {
);
}
- @Test
- public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
- mockThread(true);
- assertEquals(
- Collections.emptyList(),
- provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.keyValueStore()))
- );
- }
-
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
mockThread(false);