You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/08 22:52:23 UTC
[kafka] 01/01: KAFKA-13522: add position tracking and bounding to IQv2
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch iqv2-position-api
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit bcfd086517ff94cd6e8e84421c9b3716393e622a
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Dec 8 16:12:08 2021 -0600
KAFKA-13522: add position tracking and bounding to IQv2
---
.../org/apache/kafka/streams/KafkaStreams.java | 43 ++++++++---
.../org/apache/kafka/streams/query/Position.java | 12 ++-
.../kafka/streams/query/StateQueryResult.java | 12 ++-
.../state/internals/CachingKeyValueStore.java | 4 +-
.../state/internals/InMemoryKeyValueStore.java | 7 +-
.../state/internals/InMemorySessionStore.java | 9 ++-
.../state/internals/InMemoryWindowStore.java | 4 +-
.../streams/state/internals/MemoryLRUCache.java | 7 ++
.../state/internals/MemoryNavigableLRUCache.java | 4 +-
.../state/internals/RocksDBSessionStore.java | 9 ++-
.../streams/state/internals/RocksDBStore.java | 17 +++--
.../state/internals/RocksDBWindowStore.java | 9 ++-
.../streams/state/internals/StoreQueryUtils.java | 48 +++++++++++-
.../integration/IQv2StoreIntegrationTest.java | 87 ++++++++++++++++------
.../integration/utils/IntegrationTestUtils.java | 65 ++++++++++++++++
15 files changed, 274 insertions(+), 63 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0169dd9..c095b69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1764,19 +1764,33 @@ public class KafkaStreams implements AutoCloseable {
);
}
final StateQueryResult<R> result = new StateQueryResult<>();
+ final Set<Integer> handledPartitions = new HashSet<>();
final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
if (globalStateStores.containsKey(storeName)) {
- final StateStore store = globalStateStores.get(storeName);
- final QueryResult<R> r =
- store.query(
- request.getQuery(),
- request.getPositionBound(),
- request.executionInfoEnabled()
- );
- result.setGlobalResult(r);
+ // Global stores pose one significant problem
+ // for IQv2: when they start up, they skip the regular
+ // ingest pipeline and instead use the "restoration" pipeline
+ // to read up until the current end offset. Then, they switch
+ // over to the regular ingest pipeline.
+ //
+ // IQv2 position tracking expects to track the position of each
+ // record from the input topic through the ingest pipeline and then
+ // get the position headers through the restoration pipeline via the
+ // changelog topic. The fact that global stores "restore" the input topic
+ // instead of ingesting it violates our expectations.
+ //
+ // It has also caused other problems, so we may want to consider
+ // switching the global store processing to use the normal paradigm
+ // rather than adding special-case logic to track positions in global
+ // stores.
+ result.setGlobalResult(
+ QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."
+ )
+ );
} else {
- final Set<Integer> handledPartitions = new HashSet<>();
for (final StreamThread thread : threads) {
final Map<TaskId, Task> tasks = thread.allTasks();
@@ -1828,6 +1842,17 @@ public class KafkaStreams implements AutoCloseable {
}
}
+ if (!request.isAllPartitions()) {
+ for (final Integer partition : request.getPartitions()) {
+ if (!result.getPartitionResults().containsKey(partition)){
+ result.addResult(partition, QueryResult.forFailure(
+ FailureReason.NOT_PRESENT,
+ "The requested partition was not present at the time of the query."
+ ));
+ }
+ }
+ }
+
return result;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 951866c..88f4b49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -93,8 +93,7 @@ public class Position {
}
/**
- * Create a new, structurally independent Position that is the result of merging two other
- * Positions.
+ * Merges the provided Position into the current instance.
* <p>
* If both Positions contain the same topic -> partition -> offset mapping, the resulting
* Position will contain a mapping with the larger of the two offsets.
@@ -103,12 +102,10 @@ public class Position {
if (other == null) {
return this;
} else {
- final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
- deepCopy(position);
for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
final String topic = entry.getKey();
final Map<Integer, Long> partitionMap =
- copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+ position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
final Integer partition = partitionOffset.getKey();
final Long offset = partitionOffset.getValue();
@@ -118,7 +115,7 @@ public class Position {
}
}
}
- return new Position(copy);
+ return this;
}
}
@@ -133,7 +130,8 @@ public class Position {
* Return the partition -> offset mapping for a specific topic.
*/
public Map<Integer, Long> getBound(final String topic) {
- return Collections.unmodifiableMap(position.get(topic));
+ final ConcurrentHashMap<Integer, Long> bound = position.get(topic);
+ return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound);
}
private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 8b93bd6..c5eaa10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -97,11 +97,15 @@ public class StateQueryResult<R> {
* prior observations.
*/
public Position getPosition() {
- Position position = Position.emptyPosition();
- for (final QueryResult<R> r : partitionResults.values()) {
- position = position.merge(r.getPosition());
+ if (globalResult != null) {
+ return globalResult.getPosition();
+ } else {
+ final Position position = Position.emptyPosition();
+ for (final QueryResult<R> r : partitionResults.values()) {
+ position.merge(r.getPosition());
+ }
+ return position;
}
- return position;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index df22e8c..66683ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -107,10 +107,10 @@ public class CachingKeyValueStore
// we can skip flushing to downstream as well as writing to underlying store
if (rawNewValue != null || rawOldValue != null) {
// we need to get the old values if needed, and then put to store, and then flush
- wrapped().put(entry.key(), entry.newValue());
-
final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
+ wrapped().put(entry.key(), entry.newValue());
+
try {
flushListener.apply(
new Record<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41977cf..5f47c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
+ private final Position position = Position.emptyPosition();
private volatile boolean open = false;
private StateStoreContext context;
- private Position position;
public InMemoryKeyValueStore(final String name) {
this.name = name;
- this.position = Position.emptyPosition();
}
@Override
@@ -102,7 +101,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index d09ab0f..cf351e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -317,7 +317,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ context.taskId().partition()
+ );
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 46f4973..1b49e9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -357,7 +357,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..c81b527 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,6 +36,9 @@ import java.util.Objects;
*/
public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
+ protected StateStoreContext context;
+ protected Position position = Position.emptyPosition();
+
public interface EldestEntryRemovalListener {
void apply(Bytes key, byte[] value);
}
@@ -95,6 +99,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
put(Bytes.wrap(key), value);
restoring = false;
});
+ this.context = context;
}
@Override
@@ -122,6 +127,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
} else {
this.map.put(key, value);
}
+ StoreQueryUtils.updatePosition(position, context);
}
@Override
@@ -144,6 +150,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key);
+ StoreQueryUtils.updatePosition(position, context);
return this.map.remove(key);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index eda880e..dd2e8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -121,7 +121,9 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index c171792..e6e2740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -53,7 +53,14 @@ public class RocksDBSessionStore
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ stateStoreContext.taskId().partition()
+ );
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0ab9be9..4f07a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -326,13 +326,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
- query,
- positionBound,
- collectExecutionInfo,
- this
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index efcde31..40415c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -131,7 +131,14 @@ public class RocksDBWindowStore
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ stateStoreContext.taskId().partition()
+ );
}
private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 6217877..006981e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
+import java.util.Map;
+import java.util.Map.Entry;
+
public final class StoreQueryUtils {
// make this class uninstantiable
@@ -35,13 +38,19 @@ public final class StoreQueryUtils {
final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo,
- final StateStore store) {
+ final StateStore store,
+ final Position position,
+ final int partition
+ ) {
final QueryResult<R> result;
final long start = collectExecutionInfo ? System.nanoTime() : -1L;
- // TODO: position tracking
if (query instanceof PingQuery) {
- result = (QueryResult<R>) QueryResult.forResult(true);
+ if (!isPermitted(position, positionBound, partition)) {
+ result = QueryResult.notUpToBound(position, positionBound, partition);
+ } else {
+ result = (QueryResult<R>) QueryResult.forResult(true);
+ }
} else {
result = QueryResult.forUnknownQueryType(query, store);
}
@@ -50,6 +59,7 @@ public final class StoreQueryUtils {
"Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
);
}
+ result.setPosition(position);
return result;
}
@@ -62,4 +72,36 @@ public final class StoreQueryUtils {
position.withComponent(meta.topic(), meta.partition(), meta.offset());
}
}
+
+ public static boolean isPermitted(
+ final Position position,
+ final PositionBound positionBound,
+ final int partition
+ ) {
+ if (positionBound.isUnbounded()) {
+ return true;
+ } else {
+ final Position bound = positionBound.position();
+ for (final String topic : bound.getTopics()) {
+ final Map<Integer, Long> partitionBounds = bound.getBound(topic);
+ final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);
+ if (!partitionBounds.containsKey(partition)) {
+ // this topic isn't bounded for our partition, so just skip over it.
+ } else {
+ if (seenPartitionBounds == null) {
+ // we haven't seen a topic that is bounded for our partition
+ return false;
+ } else if (!seenPartitionBounds.containsKey(partition)) {
+ // we haven't seen a partition that we have a bound for
+ return false;
+ } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
+ partition)) {
+ // our current position is behind the bound
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 3a6adaf..62ff17d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
@@ -85,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Category({IntegrationTest.class})
@@ -248,6 +248,7 @@ public class IQv2StoreIntegrationTest {
public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {
CLUSTER.start();
+ CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
@@ -295,8 +296,14 @@ public class IQv2StoreIntegrationTest {
@Before
public void beforeTest() {
- final StreamsBuilder builder = new StreamsBuilder();
final StoreSupplier<?> supplier = storeToTest.supplier();
+ final Properties streamsConfig = streamsConfiguration(
+ cache,
+ log,
+ storeToTest.name()
+ );
+
+ final StreamsBuilder builder = new StreamsBuilder();
if (supplier instanceof KeyValueBytesStoreSupplier) {
final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as((KeyValueBytesStoreSupplier) supplier);
@@ -389,13 +396,10 @@ public class IQv2StoreIntegrationTest {
// Don't need to wait for running, since tests can use iqv2 to wait until they
// get a valid response.
+
kafkaStreams =
IntegrationTestUtils.getStartedStreams(
- streamsConfiguration(
- cache,
- log,
- supplier.getClass().getSimpleName()
- ),
+ streamsConfig,
builder,
true
);
@@ -414,10 +418,28 @@ public class IQv2StoreIntegrationTest {
@Test
public void verifyStore() {
- shouldRejectUnknownQuery();
- shouldHandlePingQuery();
- shouldCollectExecutionInfo();
- shouldCollectExecutionInfoUnderFailure();
+ if (storeToTest.global()) {
+ globalShouldRejectAllQueries();
+ } else {
+ shouldRejectUnknownQuery();
+ shouldHandlePingQuery();
+ shouldCollectExecutionInfo();
+ shouldCollectExecutionInfoUnderFailure();
+ }
+ }
+
+ private void globalShouldRejectAllQueries() {
+ final PingQuery query = new PingQuery();
+ final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
+
+ final StateQueryResult<Boolean> result = kafkaStreams.query(request);
+
+ assertThat(result.getGlobalResult().isFailure(), is(true));
+ assertThat(result.getGlobalResult().getFailureReason(),
+ is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(result.getGlobalResult().getFailureMessage(),
+ is("Global stores do not yet support the KafkaStreams#query API."
+ + " Use KafkaStreams#store instead."));
}
public void shouldRejectUnknownQuery() {
@@ -435,7 +457,6 @@ public class IQv2StoreIntegrationTest {
queryResult -> {
assertThat(queryResult.isFailure(), is(true));
assertThat(queryResult.isSuccess(), is(false));
- assertThat(queryResult.getPosition(), is(nullValue()));
assertThat(queryResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(queryResult.getFailureMessage(),
@@ -456,12 +477,18 @@ public class IQv2StoreIntegrationTest {
public void shouldHandlePingQuery() {
final PingQuery query = new PingQuery();
- final StateQueryRequest<Boolean> request =
- inStore(STORE_NAME).withQuery(query);
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Boolean> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
@@ -473,9 +500,6 @@ public class IQv2StoreIntegrationTest {
}
assertThat(queryResult.isSuccess(), is(true));
- // TODO: position not implemented
- assertThat(queryResult.getPosition(), is(nullValue()));
-
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
@@ -483,17 +507,25 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.getExecutionInfo(), is(empty()));
});
+ assertThat(result.getPosition(), is(INPUT_POSITION));
}
public void shouldCollectExecutionInfo() {
final PingQuery query = new PingQuery();
- final StateQueryRequest<Boolean> request =
- inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Boolean> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .enableExecutionInfo()
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
@@ -505,12 +537,19 @@ public class IQv2StoreIntegrationTest {
public void shouldCollectExecutionInfoUnderFailure() {
final UnknownQuery query = new UnknownQuery();
- final StateQueryRequest<Void> request =
- inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Void> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .enableExecutionInfo()
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Void> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6cfc5d4..5f527c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -91,6 +93,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.utils.Utils.sleep;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -141,6 +144,68 @@ public class IntegrationTestUtils {
throw new TimeoutException("The query never returned the desired partitions");
}
+ /**
+ * Repeatedly runs the query until the response is valid and then return the response.
+ * <p>
+ * Validity in this case means that the response position is up to the specified bound.
+ * <p>
+ * Once position bounding is generally supported, we should migrate tests to wait on the
+ * expected response position.
+ */
+ public static <R> StateQueryResult<R> iqv2WaitForResult(
+ final KafkaStreams kafkaStreams,
+ final StateQueryRequest<R> request) {
+
+ final long start = System.currentTimeMillis();
+ final long deadline = start + DEFAULT_TIMEOUT;
+
+ StateQueryResult<R> result;
+ do {
+ if (Thread.currentThread().isInterrupted()) {
+ fail("Test was interrupted.");
+ }
+
+ result = kafkaStreams.query(request);
+ final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+ if (allResults.isEmpty()) {
+ sleep(100L);
+ } else {
+ final boolean needToWait = allResults
+ .stream()
+ .anyMatch(IntegrationTestUtils::needToWait);
+ if (needToWait) {
+ sleep(100L);
+ } else {
+ return result;
+ }
+ }
+ } while (System.currentTimeMillis() < deadline);
+
+ throw new TimeoutException(
+ "The query never returned within the bound. Last result: "
+ + result
+ );
+ }
+
+ private static <R> LinkedList<QueryResult<R>> getAllResults(
+ final StateQueryResult<R> result) {
+ final LinkedList<QueryResult<R>> allResults =
+ new LinkedList<>(result.getPartitionResults().values());
+ if (result.getGlobalResult() != null) {
+ allResults.add(result.getGlobalResult());
+ }
+ return allResults;
+ }
+
+ private static <R> boolean needToWait(final QueryResult<R> queryResult) {
+ return queryResult.isFailure()
+ && (
+ FailureReason.NOT_UP_TO_BOUND.equals(queryResult.getFailureReason())
+ || FailureReason.NOT_PRESENT.equals(queryResult.getFailureReason())
+ );
+ }
+
/*
* Records state transition for StreamThread
*/