You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/11 07:04:24 UTC

[kafka] branch trunk updated: KAFKA-13522: add position tracking and bounding to IQv2 (#11581)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acd1f9c  KAFKA-13522: add position tracking and bounding to IQv2 (#11581)
acd1f9c is described below

commit acd1f9c5631ed2aec2d6ab238e6b81c1a9eb47a2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Dec 11 01:00:59 2021 -0600

    KAFKA-13522: add position tracking and bounding to IQv2 (#11581)
    
    * Fill in the Position response in the IQv2 result.
    * Enforce PositionBound in IQv2 queries.
    * Update integration testing approach to leverage consistent queries.
    
    Reviewers: Patrick Stuedi <ps...@apache.org>, Vicky Papavasileiou <vp...@confluent.io>, Guozhang Wang <gu...@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 42 ++++++----
 .../org/apache/kafka/streams/query/Position.java   | 18 +++--
 .../apache/kafka/streams/query/PositionBound.java  | 37 ++-------
 .../kafka/streams/query/StateQueryResult.java      | 12 ++-
 .../state/internals/CachingKeyValueStore.java      |  4 +-
 .../state/internals/InMemoryKeyValueStore.java     |  7 +-
 .../state/internals/InMemorySessionStore.java      |  9 ++-
 .../state/internals/InMemoryWindowStore.java       |  4 +-
 .../streams/state/internals/MemoryLRUCache.java    |  7 ++
 .../state/internals/MemoryNavigableLRUCache.java   |  4 +-
 .../streams/state/internals/PositionSerde.java     |  4 +-
 .../state/internals/RocksDBSessionStore.java       |  9 ++-
 .../streams/state/internals/RocksDBStore.java      | 17 ++--
 .../state/internals/RocksDBWindowStore.java        |  9 ++-
 .../streams/state/internals/StoreQueryUtils.java   | 39 ++++++++-
 .../ConsistencyVectorIntegrationTest.java          |  4 +-
 .../streams/integration/IQv2IntegrationTest.java   | 20 +++--
 .../integration/IQv2StoreIntegrationTest.java      | 92 ++++++++++++++++------
 .../integration/utils/IntegrationTestUtils.java    | 78 ++++++++++++++++--
 .../kafka/streams/query/PositionBoundTest.java     |  4 +-
 .../apache/kafka/streams/query/PositionTest.java   | 28 +++----
 .../AbstractRocksDBSegmentedBytesStoreTest.java    | 14 ++--
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  4 +-
 .../streams/state/internals/RocksDBStoreTest.java  | 14 ++--
 24 files changed, 327 insertions(+), 153 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ecc1d01..a8f58d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1771,17 +1771,14 @@ public class KafkaStreams implements AutoCloseable {
 
         final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
         if (globalStateStores.containsKey(storeName)) {
-            final StateStore store = globalStateStores.get(storeName);
-            final QueryResult<R> r =
-                store.query(
-                    request.getQuery(),
-                    request.getPositionBound(),
-                    request.executionInfoEnabled()
-                );
-            result.setGlobalResult(r);
+            // See KAFKA-13523
+            result.setGlobalResult(
+                QueryResult.forFailure(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    "Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."
+                )
+            );
         } else {
-            final Set<Integer> handledPartitions = new HashSet<>();
-
             for (final StreamThread thread : threads) {
                 final Map<TaskId, Task> tasks = thread.allTasks();
                 for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
@@ -1818,20 +1815,31 @@ public class KafkaStreams implements AutoCloseable {
                                 );
                                 result.addResult(partition, r);
                             }
-                        }
 
-                        // optimization: if we have handled all the requested partitions,
-                        // we can return right away.
-                        handledPartitions.add(partition);
-                        if (!request.isAllPartitions()
-                            && handledPartitions.containsAll(request.getPartitions())) {
-                            return result;
+
+                            // optimization: if we have handled all the requested partitions,
+                            // we can return right away.
+                            if (!request.isAllPartitions()
+                                && result.getPartitionResults().keySet().containsAll(request.getPartitions())) {
+                                return result;
+                            }
                         }
                     }
                 }
             }
         }
 
+        if (!request.isAllPartitions()) {
+            for (final Integer partition : request.getPartitions()) {
+                if (!result.getPartitionResults().containsKey(partition)) {
+                    result.addResult(partition, QueryResult.forFailure(
+                        FailureReason.NOT_PRESENT,
+                        "The requested partition was not present at the time of the query."
+                    ));
+                }
+            }
+        }
+
         return result;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 951866c..52ceb23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -93,8 +93,7 @@ public class Position {
     }
 
     /**
-     * Create a new, structurally independent Position that is the result of merging two other
-     * Positions.
+     * Merges the provided Position into the current instance.
      * <p>
      * If both Positions contain the same topic -> partition -> offset mapping, the resulting
      * Position will contain a mapping with the larger of the two offsets.
@@ -103,12 +102,10 @@ public class Position {
         if (other == null) {
             return this;
         } else {
-            final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
-                deepCopy(position);
             for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
                 final String topic = entry.getKey();
                 final Map<Integer, Long> partitionMap =
-                    copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+                    position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
                 for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
                     final Integer partition = partitionOffset.getKey();
                     final Long offset = partitionOffset.getValue();
@@ -118,7 +115,7 @@ public class Position {
                     }
                 }
             }
-            return new Position(copy);
+            return this;
         }
     }
 
@@ -132,8 +129,9 @@ public class Position {
     /**
      * Return the partition -> offset mapping for a specific topic.
      */
-    public Map<Integer, Long> getBound(final String topic) {
-        return Collections.unmodifiableMap(position.get(topic));
+    public Map<Integer, Long> getPartitionPositions(final String topic) {
+        final ConcurrentHashMap<Integer, Long> bound = position.get(topic);
+        return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound);
     }
 
     private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
@@ -174,4 +172,8 @@ public class Position {
         throw new UnsupportedOperationException(
             "This mutable object is not suitable as a hash key");
     }
+
+    public boolean isEmpty() {
+        return position.isEmpty();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
index 46d9376..f5ec3c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -30,63 +30,42 @@ import java.util.Objects;
 public class PositionBound {
 
     private final Position position;
-    private final boolean unbounded;
 
-    private PositionBound(final Position position, final boolean unbounded) {
-        if (unbounded && position != null) {
-            throw new IllegalArgumentException();
-        } else if (position != null) {
-            this.position = position.copy();
-            this.unbounded = false;
-        } else {
-            this.position = null;
-            this.unbounded = unbounded;
-        }
+    private PositionBound(final Position position) {
+        this.position = position.copy();
     }
 
     /**
      * Creates a new PositionBound representing "no bound"
      */
     public static PositionBound unbounded() {
-        return new PositionBound(null, true);
+        return new PositionBound(Position.emptyPosition());
     }
 
     /**
      * Creates a new PositionBound representing a specific position.
      */
     public static PositionBound at(final Position position) {
-        return new PositionBound(position, false);
+        return new PositionBound(position);
     }
 
     /**
      * Returns true iff this object specifies that there is no position bound.
      */
     public boolean isUnbounded() {
-        return unbounded;
+        return position.isEmpty();
     }
 
     /**
      * Returns the specific position of this bound.
-     *
-     * @throws IllegalArgumentException if this is an "unbounded" position.
      */
     public Position position() {
-        if (unbounded) {
-            throw new IllegalArgumentException(
-                "Cannot get the position of an unbounded PositionBound."
-            );
-        } else {
-            return position;
-        }
+        return position;
     }
 
     @Override
     public String toString() {
-        if (isUnbounded()) {
-            return "PositionBound{unbounded}";
-        } else {
-            return "PositionBound{position=" + position + '}';
-        }
+        return "PositionBound{position=" + position + '}';
     }
 
     @Override
@@ -98,7 +77,7 @@ public class PositionBound {
             return false;
         }
         final PositionBound that = (PositionBound) o;
-        return unbounded == that.unbounded && Objects.equals(position, that.position);
+        return Objects.equals(position, that.position);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 8b93bd6..c5eaa10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -97,11 +97,15 @@ public class StateQueryResult<R> {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {
+            return globalResult.getPosition();
+        } else {
+            final Position position = Position.emptyPosition();
+            for (final QueryResult<R> r : partitionResults.values()) {
+                position.merge(r.getPosition());
+            }
+            return position;
         }
-        return position;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index df22e8c..66683ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -107,10 +107,10 @@ public class CachingKeyValueStore
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());
+
                 try {
                     flushListener.apply(
                         new Record<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41977cf..5f47c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     private final String name;
     private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
+    private final Position position = Position.emptyPosition();
     private volatile boolean open = false;
     private StateStoreContext context;
-    private Position position;
 
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
-        this.position = Position.emptyPosition();
     }
 
     @Override
@@ -102,7 +101,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index d09ab0f..cf351e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -317,7 +317,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            context.taskId().partition()
+        );
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 46f4973..1b49e9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -357,7 +357,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..c81b527 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -35,6 +36,9 @@ import java.util.Objects;
  */
 public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
 
+    protected StateStoreContext context;
+    protected Position position = Position.emptyPosition();
+
     public interface EldestEntryRemovalListener {
         void apply(Bytes key, byte[] value);
     }
@@ -95,6 +99,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
             put(Bytes.wrap(key), value);
             restoring = false;
         });
+        this.context = context;
     }
 
     @Override
@@ -122,6 +127,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
         } else {
             this.map.put(key, value);
         }
+        StoreQueryUtils.updatePosition(position, context);
     }
 
     @Override
@@ -144,6 +150,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
     @Override
     public synchronized byte[] delete(final Bytes key) {
         Objects.requireNonNull(key);
+        StoreQueryUtils.updatePosition(position, context);
         return this.map.remove(key);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index eda880e..dd2e8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -121,7 +121,9 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
             query,
             positionBound,
             collectExecutionInfo,
-            this
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
index 7628889..7458ac4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
@@ -78,7 +78,7 @@ public final class PositionSerde {
             arraySize += Integer.SIZE; // topic name length
             arraySize += topicBytes.length; // topic name itself
 
-            final Map<Integer, Long> partitionOffsets = position.getBound(topic);
+            final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
             arraySize += Integer.SIZE; // Number of PartitionOffset pairs
             arraySize += (Integer.SIZE + Long.SIZE)
                     * partitionOffsets.size(); // partitionOffsets themselves
@@ -93,7 +93,7 @@ public final class PositionSerde {
             buffer.put(topics[i]);
 
             final String topic = entries.get(i);
-            final Map<Integer, Long> partitionOffsets = position.getBound(topic);
+            final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
             buffer.putInt(partitionOffsets.size());
             for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
                 buffer.putInt(partitionOffset.getKey());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index c171792..e6e2740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -53,7 +53,14 @@ public class RocksDBSessionStore
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            stateStoreContext.taskId().partition()
+        );
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0ab9be9..4f07a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -326,13 +326,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
-                                    final boolean collectExecutionInfo) {
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final boolean collectExecutionInfo) {
+
         return StoreQueryUtils.handleBasicQueries(
-                query,
-                positionBound,
-                collectExecutionInfo,
-                this
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            context.taskId().partition()
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index efcde31..40415c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -131,7 +131,14 @@ public class RocksDBWindowStore
     @Override
     public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
         final boolean collectExecutionInfo) {
-        return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            collectExecutionInfo,
+            this,
+            position,
+            stateStoreContext.taskId().partition()
+        );
     }
 
     private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 6217877..2172b02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 
+import java.util.Map;
+
 public final class StoreQueryUtils {
 
     // make this class uninstantiable
@@ -35,13 +37,19 @@ public final class StoreQueryUtils {
         final Query<R> query,
         final PositionBound positionBound,
         final boolean collectExecutionInfo,
-        final StateStore store) {
+        final StateStore store,
+        final Position position,
+        final int partition
+    ) {
 
         final QueryResult<R> result;
         final long start = collectExecutionInfo ? System.nanoTime() : -1L;
-        // TODO: position tracking
         if (query instanceof PingQuery) {
-            result = (QueryResult<R>) QueryResult.forResult(true);
+            if (!isPermitted(position, positionBound, partition)) {
+                result = QueryResult.notUpToBound(position, positionBound, partition);
+            } else {
+                result = (QueryResult<R>) QueryResult.forResult(true);
+            }
         } else {
             result = QueryResult.forUnknownQueryType(query, store);
         }
@@ -50,6 +58,7 @@ public final class StoreQueryUtils {
                 "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
             );
         }
+        result.setPosition(position);
         return result;
     }
 
@@ -62,4 +71,28 @@ public final class StoreQueryUtils {
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        final Position bound = positionBound.position();
+        for (final String topic : bound.getTopics()) {
+            final Map<Integer, Long> partitionBounds = bound.getPartitionPositions(topic);
+            final Map<Integer, Long> seenPartitionPositions = position.getPartitionPositions(topic);
+            if (!partitionBounds.containsKey(partition)) {
+                // this topic isn't bounded for our partition, so just skip over it.
+            } else {
+                if (!seenPartitionPositions.containsKey(partition)) {
+                    // we haven't seen a partition that we have a bound for
+                    return false;
+                } else if (seenPartitionPositions.get(partition) < partitionBounds.get(partition)) {
+                    // our current position is behind the bound
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
index 8c09450..4ef35c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
@@ -144,8 +144,8 @@ public class ConsistencyVectorIntegrationTest {
         for (final TestingRocksDBStore store : supplier.stores) {
             if (store.getDbDir() != null) {
                 assertThat(store.getDbDir().toString().contains("/0_0/"), is(true));
-                assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), notNullValue());
-                assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), hasEntry(0, 99L));
+                assertThat(store.getPosition().getPartitionPositions(INPUT_TOPIC_NAME), notNullValue());
+                assertThat(store.getPosition().getPartitionPositions(INPUT_TOPIC_NAME), hasEntry(0, 99L));
                 count.incrementAndGet();
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 140b5ee..4b40a55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -57,8 +57,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -97,6 +99,9 @@ public class IQv2IntegrationTest {
 
     private KafkaStreams kafkaStreams;
 
+    @Rule
+    public TestName testName = new TestName();
+
     @BeforeClass
     public static void before()
         throws InterruptedException, IOException, ExecutionException, TimeoutException {
@@ -221,7 +226,7 @@ public class IQv2IntegrationTest {
         final Object lock = stateLock.get(streamThread);
 
         // wait for the desired partitions to be assigned
-        IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+        IntegrationTestUtils.iqv2WaitForPartitions(
             kafkaStreams,
             inStore(STORE_NAME).withQuery(query),
             partitions
@@ -234,7 +239,7 @@ public class IQv2IntegrationTest {
             stateField.set(streamThread, State.PARTITIONS_ASSIGNED);
 
             final StateQueryResult<Boolean> result =
-                IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+                IntegrationTestUtils.iqv2WaitForPartitions(
                     kafkaStreams,
                     request,
                     partitions
@@ -266,7 +271,7 @@ public class IQv2IntegrationTest {
 
         kafkaStreams.start();
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
     }
@@ -280,7 +285,7 @@ public class IQv2IntegrationTest {
 
         kafkaStreams.start();
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions);
 
         assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
     }
@@ -407,7 +412,7 @@ public class IQv2IntegrationTest {
 
         kafkaStreams.start();
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         final QueryResult<Boolean> queryResult = result.getPartitionResults().get(partition);
         assertThat(queryResult.isFailure(), is(true));
@@ -419,8 +424,9 @@ public class IQv2IntegrationTest {
     }
 
 
-    private static Properties streamsConfiguration() {
-        final String safeTestName = IQv2IntegrationTest.class.getName();
+    private Properties streamsConfiguration() {
+        final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 3a6adaf..308ab9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.StateQueryRequest;
@@ -85,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.matchesPattern;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @Category({IntegrationTest.class})
@@ -248,6 +248,7 @@ public class IQv2StoreIntegrationTest {
     public static void before()
         throws InterruptedException, IOException, ExecutionException, TimeoutException {
         CLUSTER.start();
+        CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
         final int partitions = 2;
         CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
 
@@ -295,8 +296,14 @@ public class IQv2StoreIntegrationTest {
 
     @Before
     public void beforeTest() {
-        final StreamsBuilder builder = new StreamsBuilder();
         final StoreSupplier<?> supplier = storeToTest.supplier();
+        final Properties streamsConfig = streamsConfiguration(
+            cache,
+            log,
+            storeToTest.name()
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
         if (supplier instanceof KeyValueBytesStoreSupplier) {
             final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
                 Materialized.as((KeyValueBytesStoreSupplier) supplier);
@@ -389,13 +396,10 @@ public class IQv2StoreIntegrationTest {
 
         // Don't need to wait for running, since tests can use iqv2 to wait until they
         // get a valid response.
+
         kafkaStreams =
             IntegrationTestUtils.getStartedStreams(
-                streamsConfiguration(
-                    cache,
-                    log,
-                    supplier.getClass().getSimpleName()
-                ),
+                streamsConfig,
                 builder,
                 true
             );
@@ -414,10 +418,31 @@ public class IQv2StoreIntegrationTest {
 
     @Test
     public void verifyStore() {
-        shouldRejectUnknownQuery();
-        shouldHandlePingQuery();
-        shouldCollectExecutionInfo();
-        shouldCollectExecutionInfoUnderFailure();
+        if (storeToTest.global()) {
+            // See KAFKA-13523
+            globalShouldRejectAllQueries();
+        } else {
+            shouldRejectUnknownQuery();
+            shouldHandlePingQuery();
+            shouldCollectExecutionInfo();
+            shouldCollectExecutionInfoUnderFailure();
+        }
+    }
+
+    private void globalShouldRejectAllQueries() {
+        // See KAFKA-13523
+
+        final PingQuery query = new PingQuery();
+        final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
+
+        final StateQueryResult<Boolean> result = kafkaStreams.query(request);
+
+        assertThat(result.getGlobalResult().isFailure(), is(true));
+        assertThat(result.getGlobalResult().getFailureReason(),
+            is(FailureReason.UNKNOWN_QUERY_TYPE));
+        assertThat(result.getGlobalResult().getFailureMessage(),
+            is("Global stores do not yet support the KafkaStreams#query API."
+                + " Use KafkaStreams#store instead."));
     }
 
     public void shouldRejectUnknownQuery() {
@@ -427,7 +452,7 @@ public class IQv2StoreIntegrationTest {
         final Set<Integer> partitions = mkSet(0, 1);
 
         final StateQueryResult<Void> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions);
 
         makeAssertions(
             partitions,
@@ -435,7 +460,6 @@ public class IQv2StoreIntegrationTest {
             queryResult -> {
                 assertThat(queryResult.isFailure(), is(true));
                 assertThat(queryResult.isSuccess(), is(false));
-                assertThat(queryResult.getPosition(), is(nullValue()));
                 assertThat(queryResult.getFailureReason(),
                     is(FailureReason.UNKNOWN_QUERY_TYPE));
                 assertThat(queryResult.getFailureMessage(),
@@ -456,12 +480,18 @@ public class IQv2StoreIntegrationTest {
     public void shouldHandlePingQuery() {
 
         final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
-            inStore(STORE_NAME).withQuery(query);
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
@@ -473,9 +503,6 @@ public class IQv2StoreIntegrationTest {
                 }
                 assertThat(queryResult.isSuccess(), is(true));
 
-                // TODO: position not implemented
-                assertThat(queryResult.getPosition(), is(nullValue()));
-
                 assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
                 assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
 
@@ -483,17 +510,25 @@ public class IQv2StoreIntegrationTest {
 
                 assertThat(queryResult.getExecutionInfo(), is(empty()));
             });
+        assertThat(result.getPosition(), is(INPUT_POSITION));
     }
 
     public void shouldCollectExecutionInfo() {
 
         final PingQuery query = new PingQuery();
-        final StateQueryRequest<Boolean> request =
-            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Boolean> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .enableExecutionInfo()
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Boolean> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
@@ -505,12 +540,19 @@ public class IQv2StoreIntegrationTest {
     public void shouldCollectExecutionInfoUnderFailure() {
 
         final UnknownQuery query = new UnknownQuery();
-        final StateQueryRequest<Void> request =
-            inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
         final Set<Integer> partitions = mkSet(0, 1);
+        final StateQueryRequest<Void> request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .enableExecutionInfo()
+                .withPartitions(partitions)
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<Void> result =
-            IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+            IntegrationTestUtils.iqv2WaitForResult(
+                kafkaStreams,
+                request
+            );
 
         makeAssertions(
             partitions,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6cfc5d4..202fd1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.StateQueryRequest;
 import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -91,6 +93,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -116,7 +119,7 @@ public class IntegrationTestUtils {
      * Once position bounding is generally supported, we should migrate tests to wait on the
      * expected response position.
      */
-    public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal(
+    public static <R> StateQueryResult<R> iqv2WaitForPartitions(
         final KafkaStreams kafkaStreams,
         final StateQueryRequest<R> request,
         final Set<Integer> partitions) {
@@ -125,22 +128,81 @@ public class IntegrationTestUtils {
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+            if (allResults.isEmpty()) {
+                sleep(100L);
+            } else {
+                final boolean needToWait = allResults
+                    .stream()
+                    .anyMatch(IntegrationTestUtils::needToWait);
+                if (needToWait) {
+                    sleep(100L);
+                } else {
+                    return result;
+                }
+            }
+        } while (System.currentTimeMillis() < deadline);
+
+        throw new TimeoutException(
+            "The query never returned within the bound. Last result: "
+            + result
+        );
+    }
+
+    private static <R> LinkedList<QueryResult<R>> getAllResults(
+        final StateQueryResult<R> result) {
+        final LinkedList<QueryResult<R>> allResults =
+            new LinkedList<>(result.getPartitionResults().values());
+        if (result.getGlobalResult() != null) {
+            allResults.add(result.getGlobalResult());
+        }
+        return allResults;
+    }
+
+    private static <R> boolean needToWait(final QueryResult<R> queryResult) {
+        return queryResult.isFailure()
+            && (
+            FailureReason.NOT_UP_TO_BOUND.equals(queryResult.getFailureReason())
+                || FailureReason.NOT_PRESENT.equals(queryResult.getFailureReason()));
+    }
+
     /*
      * Records state transition for StreamThread
      */
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
index fd881bc..57aed4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
@@ -50,9 +50,9 @@ public class PositionBoundTest {
     }
 
     @Test
-    public void unboundedShouldThrowOnPosition() {
+    public void unboundedShouldReturnEmptyPosition() {
         final PositionBound bound = PositionBound.unbounded();
-        assertThrows(IllegalArgumentException.class, bound::position);
+        assertThat(bound.position(), equalTo(Position.emptyPosition()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
index ec29993..eafc28e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -47,14 +47,14 @@ public class PositionTest {
 
         final Position position = Position.fromMap(map);
         assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
-        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(position.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
 
         // Should be a copy of the constructor map
 
         map.get("topic1").put(99, 99L);
 
         // so the position is still the original one
-        assertThat(position.getBound("topic1"), equalTo(mkMap(
+        assertThat(position.getPartitionPositions("topic1"), equalTo(mkMap(
             mkEntry(0, 5L),
             mkEntry(7, 0L)
         )));
@@ -85,13 +85,13 @@ public class PositionTest {
         final Position merged = position.merge(position1);
 
         assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
-        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
-        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+        assertThat(merged.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+        assertThat(merged.getPartitionPositions("topic1"), equalTo(mkMap(
             mkEntry(0, 5L),
             mkEntry(7, 0L),
             mkEntry(8, 1L)
         )));
-        assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+        assertThat(merged.getPartitionPositions("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
     }
 
     @Test
@@ -99,9 +99,9 @@ public class PositionTest {
         final Position position = Position.emptyPosition();
         position.withComponent("topic", 3, 5L);
         position.withComponent("topic", 3, 4L);
-        assertThat(position.getBound("topic").get(3), equalTo(5L));
+        assertThat(position.getPartitionPositions("topic").get(3), equalTo(5L));
         position.withComponent("topic", 3, 6L);
-        assertThat(position.getBound("topic").get(3), equalTo(6L));
+        assertThat(position.getPartitionPositions("topic").get(3), equalTo(6L));
     }
 
     @Test
@@ -123,21 +123,21 @@ public class PositionTest {
 
         // copy has not changed
         assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
-        assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
-        assertThat(copy.getBound("topic1"), equalTo(mkMap(
+        assertThat(copy.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(copy.getPartitionPositions("topic1"), equalTo(mkMap(
             mkEntry(0, 5L),
             mkEntry(7, 0L)
         )));
 
         // original has changed
         assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
-        assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
-        assertThat(position.getBound("topic1"), equalTo(mkMap(
+        assertThat(position.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+        assertThat(position.getPartitionPositions("topic1"), equalTo(mkMap(
             mkEntry(0, 5L),
             mkEntry(7, 0L),
             mkEntry(8, 1L)
         )));
-        assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+        assertThat(position.getPartitionPositions("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
     }
 
     @Test
@@ -153,8 +153,8 @@ public class PositionTest {
         final Position merged = position.merge(null);
 
         assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
-        assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
-        assertThat(merged.getBound("topic1"), equalTo(mkMap(
+        assertThat(merged.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+        assertThat(merged.getPartitionPositions("topic1"), equalTo(mkMap(
             mkEntry(0, 5L),
             mkEntry(7, 0L)
         )));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index c826991..32e3386 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -592,8 +592,8 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
         assertThat(bytesStore.getPosition(), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound(""), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound(""), hasEntry(0, 3L));
+        assertThat(bytesStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
+        assertThat(bytesStore.getPosition().getPartitionPositions(""), hasEntry(0, 3L));
     }
 
     @Test
@@ -629,10 +629,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
         assertThat(bytesStore.getPosition(), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound("A"), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound("A"), hasEntry(0, 3L));
-        assertThat(bytesStore.getPosition().getBound("B"), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound("B"), hasEntry(0, 2L));
+        assertThat(bytesStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
+        assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 3L));
+        assertThat(bytesStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
+        assertThat(bytesStore.getPosition().getPartitionPositions("B"), hasEntry(0, 2L));
     }
 
     @Test
@@ -665,7 +665,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
         assertEquals(expected, results);
         assertThat(bytesStore.getPosition(), Matchers.notNullValue());
-        assertThat(bytesStore.getPosition().getBound("A"), hasEntry(0, 2L));
+        assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index b5e92f0..0d577e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -270,8 +270,8 @@ public class ChangeLoggingKeyValueBytesStoreTest {
         final Header vectorHeader = collector.collected().get(0).headers().lastHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY);
         assertThat(vectorHeader, is(notNullValue()));
         final Position position = PositionSerde.deserialize(ByteBuffer.wrap(vectorHeader.value()));
-        assertThat(position.getBound(INPUT_TOPIC_NAME), is(notNullValue()));
-        assertThat(position.getBound(INPUT_TOPIC_NAME), hasEntry(0, 100L));
+        assertThat(position.getPartitionPositions(INPUT_TOPIC_NAME), is(notNullValue()));
+        assertThat(position.getPartitionPositions(INPUT_TOPIC_NAME), hasEntry(0, 100L));
 
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index daa4572..3667c0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1003,8 +1003,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
                         rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
 
         assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound(""), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound(""), hasEntry(0, 3L));
+        assertThat(rocksDBStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
+        assertThat(rocksDBStore.getPosition().getPartitionPositions(""), hasEntry(0, 3L));
     }
 
     @Test
@@ -1040,10 +1040,10 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
                         rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
 
         assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound("A"), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound("A"), hasEntry(0, 3L));
-        assertThat(rocksDBStore.getPosition().getBound("B"), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound("B"), hasEntry(0, 2L));
+        assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
+        assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), hasEntry(0, 3L));
+        assertThat(rocksDBStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
+        assertThat(rocksDBStore.getPosition().getPartitionPositions("B"), hasEntry(0, 2L));
     }
 
     @Test
@@ -1067,7 +1067,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
                 rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
 
         assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
-        assertThat(rocksDBStore.getPosition().getBound("A"), hasEntry(0, 2L));
+        assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
     }
 
     @Test