You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/08 22:52:23 UTC

[kafka] 01/01: KAFKA-13522: add position tracking and bounding to IQv2

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-position-api
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bcfd086517ff94cd6e8e84421c9b3716393e622a
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Dec 8 16:12:08 2021 -0600

    KAFKA-13522: add position tracking and bounding to IQv2
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 43 ++++++++---
 .../org/apache/kafka/streams/query/Position.java   | 12 ++-
 .../kafka/streams/query/StateQueryResult.java      | 12 ++-
 .../state/internals/CachingKeyValueStore.java      |  4 +-
 .../state/internals/InMemoryKeyValueStore.java     |  7 +-
 .../state/internals/InMemorySessionStore.java      |  9 ++-
 .../state/internals/InMemoryWindowStore.java       |  4 +-
 .../streams/state/internals/MemoryLRUCache.java    |  7 ++
 .../state/internals/MemoryNavigableLRUCache.java   |  4 +-
 .../state/internals/RocksDBSessionStore.java       |  9 ++-
 .../streams/state/internals/RocksDBStore.java      | 17 +++--
 .../state/internals/RocksDBWindowStore.java        |  9 ++-
 .../streams/state/internals/StoreQueryUtils.java   | 48 +++++++++++-
 .../integration/IQv2StoreIntegrationTest.java      | 87 ++++++++++++++++------
 .../integration/utils/IntegrationTestUtils.java    | 65 ++++++++++++++++
 15 files changed, 274 insertions(+), 63 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0169dd9..c095b69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1764,19 +1764,33 @@ public class KafkaStreams implements AutoCloseable {
             );
         }
         final StateQueryResult<R> result = new StateQueryResult<>();
+        final Set<Integer> handledPartitions = new HashSet<>();
 
         final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
         if (globalStateStores.containsKey(storeName)) {
-            final StateStore store = globalStateStores.get(storeName);
-            final QueryResult<R> r =
-                store.query(
-                    request.getQuery(),
-                    request.getPositionBound(),
-                    request.executionInfoEnabled()
-                );
-            result.setGlobalResult(r);
+            // Global stores pose one significant problem
+            // for IQv2: when they start up, they skip the regular
+            // ingest pipeline and instead use the "restoration" pipeline
+            // to read up until the current end offset. Then, they switch
+            // over to the regular ingest pipeline.
+            //
+            // IQv2 position tracking expects to track the position of each
+            // record from the input topic through the ingest pipeline and then
+            // get the position headers through the restoration pipeline via the
+            // changelog topic. The fact that global stores "restore" the input topic
+            // instead of ingesting it violates our expectations.
+            //
+            // It has also caused other problems, so we may want to consider
+            // switching the global store processing to use the normal paradigm
+            // rather than adding special-case logic to track positions in global
+            // stores.
+            result.setGlobalResult(
+                QueryResult.forFailure(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    "Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."
+                )
+            );
         } else {
-            final Set<Integer> handledPartitions = new HashSet<>();
 
             for (final StreamThread thread : threads) {
                 final Map<TaskId, Task> tasks = thread.allTasks();
@@ -1828,6 +1842,17 @@ public class KafkaStreams implements AutoCloseable {
             }
         }
 
+        if (!request.isAllPartitions()) {
+            for (final Integer partition : request.getPartitions()) {
+                if (!result.getPartitionResults().containsKey(partition)){
+                    result.addResult(partition, QueryResult.forFailure(
+                        FailureReason.NOT_PRESENT,
+                        "The requested partition was not present at the time of the query."
+                    ));
+                }
+            }
+        }
+
         return result;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 951866c..88f4b49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -93,8 +93,7 @@ public class Position {
     }
 
     /**
-     * Create a new, structurally independent Position that is the result of merging two other
-     * Positions.
+     * Merges the provided Position into the current instance.
      * <p>
      * If both Positions contain the same topic -> partition -> offset mapping, the resulting
      * Position will contain a mapping with the larger of the two offsets.
@@ -103,12 +102,10 @@ public class Position {
         if (other == null) {
             return this;
         } else {
-            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
-                deepCopy(position);
             for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
                 final String topic = entry.getKey();
                 final Map<Integer, Long> partitionMap =
-                    copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+                    position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
                 for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
                     final Integer partition = partitionOffset.getKey();
                     final Long offset = partitionOffset.getValue();
@@ -118,7 +115,7 @@ public class Position {
                     }
                 }
             }
-            return new Position(copy);
+            return this;
         }
     }
 
@@ -133,7 +130,8 @@ public class Position {
      * Return the partition -> offset mapping for a specific topic.
      */
     public Map<Integer, Long> getBound(final String topic) {
-        return Collections.unmodifiableMap(position.get(topic));
+        final ConcurrentHashMap<Integer, Long> bound = position.get(topic);
+        return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound);
     }
 
     private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 8b93bd6..c5eaa10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -97,11 +97,15 @@ public class StateQueryResult<R> {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {
+            return globalResult.getPosition();
+        } else {
+            final Position position = Position.emptyPosition();
+            for (final QueryResult<R> r : partitionResults.values()) {
+                position.merge(r.getPosition());
+            }
+            return position;
         }
-        return position;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index df22e8c..66683ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -107,10 +107,10 @@ public class CachingKeyValueStore
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());
+
                 try {
                     flushListener.apply(
                         new Record<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41977cf..5f47c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     private final String name;
     private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
+    private final Position position = Position.emptyPosition();
     private volatile boolean open = false;
     private StateStoreContext context;
-    private Position position;
 
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
-        this.position = Position.emptyPosition();
     }
 
     @Override
@@ -102,7 +101,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index d09ab0f..cf351e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -317,7 +317,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            context.taskId().partition()
+        );
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 46f4973..1b49e9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -357,7 +357,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..c81b527 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -35,6 +36,9 @@ import java.util.Objects;
  */
 public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
 
+    protected StateStoreContext context;
+    protected Position position = Position.emptyPosition();
+
     public interface EldestEntryRemovalListener {
         void apply(Bytes key, byte[] value);
     }
@@ -95,6 +99,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
             put(Bytes.wrap(key), value);
             restoring = false;
         });
+        this.context = context;
     }
 
     @Override
@@ -122,6 +127,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
         } else {
             this.map.put(key, value);
         }
+        StoreQueryUtils.updatePosition(position, context);
     }
 
     @Override
@@ -144,6 +150,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
     @Override
     public synchronized byte[] delete(final Bytes key) {
         Objects.requireNonNull(key);
+        StoreQueryUtils.updatePosition(position, context);
         return this.map.remove(key);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index eda880e..dd2e8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -121,7 +121,9 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index c171792..e6e2740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -53,7 +53,14 @@ public class RocksDBSessionStore
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            stateStoreContext.taskId().partition()
+        );
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0ab9be9..4f07a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -326,13 +326,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-                                    final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
-                query,
-                positionBound,
-                collectExecutionInfo,
-                this
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index efcde31..40415c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -131,7 +131,14 @@ public class RocksDBWindowStore
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            stateStoreContext.taskId().partition()
+        );
     }
 
     private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 6217877..006981e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -24,6 +24,9 @@ import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 
+import java.util.Map;
+import java.util.Map.Entry;
+
 public final class StoreQueryUtils {
 
     // make this class uninstantiable
@@ -35,13 +38,19 @@ public final class StoreQueryUtils {
         final Query<R> query,
         final PositionBound positionBound,
         final boolean collectExecutionInfo,
-        final StateStore store) {
+        final StateStore store,
+        final Position position,
+        final int partition
+    ) {
 
         final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        // TODO: position tracking
         if (query instanceof PingQuery) {
-            result = (QueryResult<R>) QueryResult.forResult(true);
+            if (!isPermitted(position, positionBound, partition)) {
+                result = QueryResult.notUpToBound(position, positionBound, partition);
+            } else {
+                result = (QueryResult<R>) QueryResult.forResult(true);
+            }
         } else {
             result = QueryResult.forUnknownQueryType(query, store);
         }
@@ -50,6 +59,7 @@ public final class StoreQueryUtils {
                 "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
             );
         }
+        result.setPosition(position);
         return result;
     }
 
@@ -62,4 +72,36 @@ public final class StoreQueryUtils {
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position bound = positionBound.position();
+            for (final String topic : bound.getTopics()) {
+                final Map<Integer, Long> partitionBounds = bound.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);
+                if (!partitionBounds.containsKey(partition)) {
+                    // this topic isn't bounded for our partition, so just skip over it.
+                } else {
+                    if (seenPartitionBounds == null) {
+                        // we haven't seen a topic that is bounded for our partition
+                        return false;
+                    } else if (!seenPartitionBounds.containsKey(partition)) {
+                        // we haven't seen a partition that we have a bound for
+                        return false;
+                    } else if (seenPartitionBounds.get(partition) < partitionBounds.get(
+                        partition)) {
+                        // our current position is behind the bound
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 3a6adaf..62ff17d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.StateQueryRequest;
@@ -85,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.matchesPattern;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @Category({IntegrationTest.class})
@@ -248,6 +248,7 @@ public class IQv2StoreIntegrationTest {
     public static void before()
         throws InterruptedException, IOException, ExecutionException, TimeoutException {
         CLUSTER.start();
+        CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
         final int partitions = 2;
         CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
 
@@ -295,8 +296,14 @@ public class IQv2StoreIntegrationTest {
 
     @Before
     public void beforeTest() {
-        final StreamsBuilder builder = new StreamsBuilder();
         final StoreSupplier<?> supplier = storeToTest.supplier();
+        final Properties streamsConfig = streamsConfiguration(
+            cache,
+            log,
+            storeToTest.name()
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
         if (supplier instanceof KeyValueBytesStoreSupplier) {
             final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
                 Materialized.as((KeyValueBytesStoreSupplier) supplier);
@@ -389,13 +396,10 @@ public class IQv2StoreIntegrationTest {
 
         // Don't need to wait for running, since tests can use iqv2 to wait until they
         // get a valid response.
+
         kafkaStreams =
             IntegrationTestUtils.getStartedStreams(
-                streamsConfiguration(
-                    cache,
-                    log,
-                    supplier.getClass().getSimpleName()
-                ),
+                streamsConfig,
                 builder,
                 true
             );
@@ -414,10 +418,28 @@ public class IQv2StoreIntegrationTest {
 
     @Test
     public void verifyStore() {
-        shouldRejectUnknownQuery();
-        shouldHandlePingQuery();
-        shouldCollectExecutionInfo();
-        shouldCollectExecutionInfoUnderFailure();
+        if (storeToTest.global()) {
+            globalShouldRejectAllQueries();
+        } else {
+            shouldRejectUnknownQuery();
+            shouldHandlePingQuery();
+            shouldCollectExecutionInfo();
+            shouldCollectExecutionInfoUnderFailure();
+        }
+    }
+
+    private void globalShouldRejectAllQueries() {
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
+
+        final StateQueryResult<Boolean> result = kafkaStreams.query(request);
+
+        assertThat(result.getGlobalResult().isFailure(), is(true));
+        assertThat(result.getGlobalResult().getFailureReason(),
+            is(FailureReason.UNKNOWN_QUERY_TYPE));
+        assertThat(result.getGlobalResult().getFailureMessage(),
+            is("Global stores do not yet support the KafkaStreams#query API."
+                + " Use KafkaStreams#store instead."));
     }
 
     public void shouldRejectUnknownQuery() {
@@ -435,7 +457,6 @@ public class IQv2StoreIntegrationTest {
             queryResult -> {
                 assertThat(queryResult.isFailure(), is(true));
                 assertThat(queryResult.isSuccess(), is(false));
-                assertThat(queryResult.getPosition(), is(nullValue()));
                 assertThat(queryResult.getFailureReason(),
                     is(FailureReason.UNKNOWN_QUERY_TYPE));
                 assertThat(queryResult.getFailureMessage(),
@@ -456,12 +477,18 @@ public class IQv2StoreIntegrationTest {
     public void shouldHandlePingQuery() {
 
         final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
-            inStore(STORE_NAME).withQuery(query);
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
@@ -473,9 +500,6 @@ public class IQv2StoreIntegrationTest {
                 }
                 assertThat(queryResult.isSuccess(), is(true));
 
-                // TODO: position not implemented
-                assertThat(queryResult.getPosition(), is(nullValue()));
-
                 assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
                 assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
 
@@ -483,17 +507,25 @@ public class IQv2StoreIntegrationTest {
 
                 assertThat(queryResult.getExecutionInfo(), is(empty()));
             });
+        assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
     public void shouldCollectExecutionInfo() {
 
         final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
-            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .enableExecutionInfo()
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
@@ -505,12 +537,19 @@ public class IQv2StoreIntegrationTest {
     public void shouldCollectExecutionInfoUnderFailure() {
 
         final UnknownQuery query = new UnknownQuery();
-        final StateQueryRequest<Void> request =
-            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Void> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .enableExecutionInfo()
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Void> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6cfc5d4..5f527c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.StateQueryRequest;
 import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -91,6 +93,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -141,6 +144,68 @@ public class IntegrationTestUtils {
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+            if (allResults.isEmpty()) {
+                sleep(100L);
+            } else {
+                final boolean needToWait = allResults
+                    .stream()
+                    .anyMatch(IntegrationTestUtils::needToWait);
+                if (needToWait) {
+                    sleep(100L);
+                } else {
+                    return result;
+                }
+            }
+        } while (System.currentTimeMillis() < deadline);
+
+        throw new TimeoutException(
+            "The query never returned within the bound. Last result: "
+            + result
+        );
+    }
+
+    private static <R> LinkedList<QueryResult<R>> getAllResults(
+        final StateQueryResult<R> result) {
+        final LinkedList<QueryResult<R>> allResults =
+            new LinkedList<>(result.getPartitionResults().values());
+        if (result.getGlobalResult() != null) {
+            allResults.add(result.getGlobalResult());
+        }
+        return allResults;
+    }
+
+    private static <R> boolean needToWait(final QueryResult<R> queryResult) {
+        return queryResult.isFailure()
+            && (
+            FailureReason.NOT_UP_TO_BOUND.equals(queryResult.getFailureReason())
+                || FailureReason.NOT_PRESENT.equals(queryResult.getFailureReason())
+        );
+    }
+
     /*
      * Records state transition for StreamThread
      */